diff --git a/Cargo.lock b/Cargo.lock index 9c4861f263..13d6ca93df 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5630,6 +5630,17 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8835116a5c179084a830efb3adc117ab007512b535bc1a21c991d3b32a6b44dd" +[[package]] +name = "payment_methods" +version = "0.1.0" +dependencies = [ + "async-trait", + "common_utils", + "dyn-clone", + "hyperswitch_domain_models", + "storage_impl", +] + [[package]] name = "pem" version = "3.0.4" @@ -6633,6 +6644,7 @@ dependencies = [ "once_cell", "openidconnect", "openssl", + "payment_methods", "pm_auth", "quick-xml", "rand", diff --git a/crates/hyperswitch_domain_models/src/errors.rs b/crates/hyperswitch_domain_models/src/errors.rs index bd05e1a377..a611e2ba82 100644 --- a/crates/hyperswitch_domain_models/src/errors.rs +++ b/crates/hyperswitch_domain_models/src/errors.rs @@ -39,3 +39,13 @@ pub enum StorageError { #[error("RedisError: {0:?}")] RedisError(String), } + +impl StorageError { + pub fn is_db_not_found(&self) -> bool { + match self { + Self::DatabaseError(err) => matches!(err.current_context(), DatabaseError::NotFound), + Self::ValueNotFound(_) => true, + _ => false, + } + } +} diff --git a/crates/hyperswitch_domain_models/src/payment_methods.rs b/crates/hyperswitch_domain_models/src/payment_methods.rs index 5257015d74..762ab5bbf8 100644 --- a/crates/hyperswitch_domain_models/src/payment_methods.rs +++ b/crates/hyperswitch_domain_models/src/payment_methods.rs @@ -1,17 +1,18 @@ #[cfg(feature = "v2")] use api_models::payment_methods::PaymentMethodsData; +// specific imports because of using the macro +use common_enums::enums::MerchantStorageScheme; #[cfg(all(feature = "v2", feature = "payment_methods_v2"))] use common_utils::{crypto::Encryptable, encryption::Encryption, types::keymanager::ToEncryptable}; use common_utils::{ crypto::OptionalEncryptableValue, errors::{CustomResult, ParsingError, ValidationError}, - pii, type_name, + id_type, pii, type_name, types::keymanager, }; -use diesel_models::enums as storage_enums; +use diesel_models::{enums as storage_enums, PaymentMethodUpdate}; use error_stack::ResultExt; use masking::{PeekInterface, Secret}; -// specific imports because of using the macro #[cfg(feature = "v2")] use rustc_hash::FxHashMap; #[cfg(feature = "v2")] @@ -21,7 +22,9 @@ use time::PrimitiveDateTime; #[cfg(all(feature = "v2", feature = "payment_methods_v2"))] use crate::{address::Address, type_encryption::OptionalEncryptableJsonType}; use crate::{ + errors, mandates::{self, CommonMandateReference}, + merchant_key_store::MerchantKeyStore, type_encryption::{crypto_operation, AsyncLift, CryptoOperation}, }; @@ -45,8 +48,8 @@ impl VaultId { ))] #[derive(Clone, Debug)] pub struct PaymentMethod { - pub customer_id: common_utils::id_type::CustomerId, - pub merchant_id: common_utils::id_type::MerchantId, + pub customer_id: id_type::CustomerId, + pub merchant_id: id_type::MerchantId, pub payment_method_id: String, pub accepted_currency: Option>, pub scheme: Option, @@ -85,13 +88,13 @@ pub struct PaymentMethod { #[derive(Clone, Debug, router_derive::ToEncryption)] pub struct PaymentMethod { /// The identifier for the payment method. Using this recurring payments can be made - pub id: common_utils::id_type::GlobalPaymentMethodId, + pub id: id_type::GlobalPaymentMethodId, /// The customer id against which the payment method is saved - pub customer_id: common_utils::id_type::GlobalCustomerId, + pub customer_id: id_type::GlobalCustomerId, /// The merchant id against which the payment method is saved - pub merchant_id: common_utils::id_type::MerchantId, + pub merchant_id: id_type::MerchantId, pub created_at: PrimitiveDateTime, pub last_modified: PrimitiveDateTime, pub payment_method_type: Option, @@ -126,7 +129,7 @@ impl PaymentMethod { } #[cfg(all(feature = "v2", feature = "payment_methods_v2"))] - pub fn get_id(&self) -> &common_utils::id_type::GlobalPaymentMethodId { + pub fn get_id(&self) -> &id_type::GlobalPaymentMethodId { &self.id } @@ -582,16 +585,16 @@ impl super::behaviour::Conversion for PaymentMethod { #[cfg(feature = "v2")] #[derive(Clone, Debug, router_derive::ToEncryption)] pub struct PaymentMethodSession { - pub id: common_utils::id_type::GlobalPaymentMethodSessionId, - pub customer_id: common_utils::id_type::GlobalCustomerId, + pub id: id_type::GlobalPaymentMethodSessionId, + pub customer_id: id_type::GlobalCustomerId, #[encrypt(ty = Value)] pub billing: Option>, pub return_url: Option, pub psp_tokenization: Option, pub network_tokenization: Option, pub expires_at: PrimitiveDateTime, - pub associated_payment_methods: Option>, - pub associated_payment: Option, + pub associated_payment_methods: Option>, + pub associated_payment: Option, } #[cfg(feature = "v2")] @@ -685,6 +688,150 @@ impl super::behaviour::Conversion for PaymentMethodSession { } } +#[async_trait::async_trait] +pub trait PaymentMethodInterface { + #[cfg(all( + any(feature = "v1", feature = "v2"), + not(feature = "payment_methods_v2") + ))] + async fn find_payment_method( + &self, + state: &keymanager::KeyManagerState, + key_store: &MerchantKeyStore, + payment_method_id: &str, + storage_scheme: MerchantStorageScheme, + ) -> CustomResult; + + #[cfg(all(feature = "v2", feature = "customer_v2"))] + async fn find_payment_method( + &self, + state: &keymanager::KeyManagerState, + key_store: &MerchantKeyStore, + payment_method_id: &id_type::GlobalPaymentMethodId, + storage_scheme: MerchantStorageScheme, + ) -> CustomResult; + + #[cfg(all( + any(feature = "v1", feature = "v2"), + not(feature = "payment_methods_v2") + ))] + async fn find_payment_method_by_locker_id( + &self, + state: &keymanager::KeyManagerState, + key_store: &MerchantKeyStore, + locker_id: &str, + storage_scheme: MerchantStorageScheme, + ) -> CustomResult; + + #[cfg(all( + any(feature = "v1", feature = "v2"), + not(feature = "payment_methods_v2") + ))] + async fn find_payment_method_by_customer_id_merchant_id_list( + &self, + state: &keymanager::KeyManagerState, + key_store: &MerchantKeyStore, + customer_id: &id_type::CustomerId, + merchant_id: &id_type::MerchantId, + limit: Option, + ) -> CustomResult, errors::StorageError>; + + // Need to fix this once we start moving to v2 for payment method + #[cfg(all(feature = "v2", feature = "customer_v2"))] + async fn find_payment_method_list_by_global_customer_id( + &self, + state: &keymanager::KeyManagerState, + key_store: &MerchantKeyStore, + id: &id_type::GlobalCustomerId, + limit: Option, + ) -> CustomResult, errors::StorageError>; + + #[cfg(all( + any(feature = "v1", feature = "v2"), + not(feature = "payment_methods_v2") + ))] + #[allow(clippy::too_many_arguments)] + async fn find_payment_method_by_customer_id_merchant_id_status( + &self, + state: &keymanager::KeyManagerState, + key_store: &MerchantKeyStore, + customer_id: &id_type::CustomerId, + merchant_id: &id_type::MerchantId, + status: common_enums::PaymentMethodStatus, + limit: Option, + storage_scheme: MerchantStorageScheme, + ) -> CustomResult, errors::StorageError>; + + #[cfg(all(feature = "v2", feature = "customer_v2"))] + #[allow(clippy::too_many_arguments)] + async fn find_payment_method_by_global_customer_id_merchant_id_status( + &self, + state: &keymanager::KeyManagerState, + key_store: &MerchantKeyStore, + customer_id: &id_type::GlobalCustomerId, + merchant_id: &id_type::MerchantId, + status: common_enums::PaymentMethodStatus, + limit: Option, + storage_scheme: MerchantStorageScheme, + ) -> CustomResult, errors::StorageError>; + + #[cfg(all( + any(feature = "v1", feature = "v2"), + not(feature = "payment_methods_v2") + ))] + async fn get_payment_method_count_by_customer_id_merchant_id_status( + &self, + customer_id: &id_type::CustomerId, + merchant_id: &id_type::MerchantId, + status: common_enums::PaymentMethodStatus, + ) -> CustomResult; + + async fn insert_payment_method( + &self, + state: &keymanager::KeyManagerState, + key_store: &MerchantKeyStore, + payment_method: PaymentMethod, + storage_scheme: MerchantStorageScheme, + ) -> CustomResult; + + async fn update_payment_method( + &self, + state: &keymanager::KeyManagerState, + key_store: &MerchantKeyStore, + payment_method: PaymentMethod, + payment_method_update: PaymentMethodUpdate, + storage_scheme: MerchantStorageScheme, + ) -> CustomResult; + + #[cfg(all(feature = "v2", feature = "payment_methods_v2"))] + async fn delete_payment_method( + &self, + state: &keymanager::KeyManagerState, + key_store: &MerchantKeyStore, + payment_method: PaymentMethod, + ) -> CustomResult; + + #[cfg(all(feature = "v2", feature = "payment_methods_v2"))] + async fn find_payment_method_by_fingerprint_id( + &self, + state: &keymanager::KeyManagerState, + key_store: &MerchantKeyStore, + fingerprint_id: &str, + ) -> CustomResult; + + #[cfg(all( + any(feature = "v1", feature = "v2"), + not(feature = "payment_methods_v2") + ))] + async fn delete_payment_method_by_merchant_id_payment_method_id( + &self, + state: &keymanager::KeyManagerState, + key_store: &MerchantKeyStore, + merchant_id: &id_type::MerchantId, + payment_method_id: &str, + ) -> CustomResult; +} + #[cfg(feature = "v2")] pub enum PaymentMethodsSessionUpdateEnum { GeneralUpdate { @@ -753,7 +900,7 @@ pub struct PaymentMethodsSessionUpdateInternal { #[cfg(test)] mod tests { #![allow(clippy::unwrap_used)] - use common_utils::id_type::MerchantConnectorAccountId; + use id_type::MerchantConnectorAccountId; use super::*; @@ -761,8 +908,8 @@ mod tests { mandate_data: Option, ) -> PaymentMethod { let payment_method = PaymentMethod { - customer_id: common_utils::id_type::CustomerId::default(), - merchant_id: common_utils::id_type::MerchantId::default(), + customer_id: id_type::CustomerId::default(), + merchant_id: id_type::MerchantId::default(), payment_method_id: String::from("abc"), accepted_currency: None, scheme: None, diff --git a/crates/payment_methods/Cargo.toml b/crates/payment_methods/Cargo.toml new file mode 100644 index 0000000000..409e445d7f --- /dev/null +++ b/crates/payment_methods/Cargo.toml @@ -0,0 +1,23 @@ +[package] +name = "payment_methods" +version = "0.1.0" +edition.workspace = true +rust-version.workspace = true +license.workspace = true + +[dependencies] +async-trait = "0.1.79" +dyn-clone = "1.0.17" + +common_utils = { version = "0.1.0", path = "../common_utils", features = ["signals", "async_ext", "logs", "metrics", "keymanager", "encryption_service"] } +hyperswitch_domain_models = { version = "0.1.0", path = "../hyperswitch_domain_models", default-features = false } +storage_impl = { version = "0.1.0", path = "../storage_impl", default-features = false } + +[lints] +workspace = true + +[features] +default = ["v1"] +v1 = ["hyperswitch_domain_models/v1", "storage_impl/v1", "common_utils/v1"] +v2 = [ "payment_methods_v2"] +payment_methods_v2 = [ "hyperswitch_domain_models/payment_methods_v2", "storage_impl/payment_methods_v2", "common_utils/payment_methods_v2"] \ No newline at end of file diff --git a/crates/payment_methods/src/lib.rs b/crates/payment_methods/src/lib.rs new file mode 100644 index 0000000000..266c62acc7 --- /dev/null +++ b/crates/payment_methods/src/lib.rs @@ -0,0 +1 @@ +pub mod state; diff --git a/crates/payment_methods/src/state.rs b/crates/payment_methods/src/state.rs new file mode 100644 index 0000000000..97bb7bb75b --- /dev/null +++ b/crates/payment_methods/src/state.rs @@ -0,0 +1,82 @@ +#[cfg(all( + any(feature = "v1", feature = "v2"), + not(feature = "payment_methods_v2") +))] +use common_utils::errors::CustomResult; +use common_utils::types::keymanager::KeyManagerState; +#[cfg(all( + any(feature = "v1", feature = "v2"), + not(feature = "payment_methods_v2") +))] +use hyperswitch_domain_models::{ + errors, merchant_account::MerchantAccount, payment_methods::PaymentMethod, +}; +use hyperswitch_domain_models::{ + merchant_key_store::MerchantKeyStore, payment_methods::PaymentMethodInterface, +}; +use storage_impl::{kv_router_store::KVRouterStore, DatabaseStore, MockDb, RouterStore}; + +#[async_trait::async_trait] +pub trait PaymentMethodsStorageInterface: + Send + Sync + dyn_clone::DynClone + PaymentMethodInterface + 'static +{ +} +dyn_clone::clone_trait_object!(PaymentMethodsStorageInterface); + +#[async_trait::async_trait] +impl PaymentMethodsStorageInterface for MockDb {} + +#[async_trait::async_trait] +impl PaymentMethodsStorageInterface for RouterStore {} + +#[async_trait::async_trait] +impl PaymentMethodsStorageInterface for KVRouterStore {} + +#[derive(Clone)] +pub struct PaymentMethodsState { + pub store: Box, + pub key_store: Option, + pub key_manager_state: KeyManagerState, +} +impl From<&PaymentMethodsState> for KeyManagerState { + fn from(state: &PaymentMethodsState) -> Self { + state.key_manager_state.clone() + } +} +#[cfg(all( + any(feature = "v1", feature = "v2"), + not(feature = "payment_methods_v2") +))] +impl PaymentMethodsState { + pub async fn find_payment_method( + &self, + key_store: &MerchantKeyStore, + merchant_account: &MerchantAccount, + payment_method_id: String, + ) -> CustomResult { + let db = &*self.store; + let key_manager_state = &(self.key_manager_state).clone(); + + match db + .find_payment_method( + key_manager_state, + key_store, + &payment_method_id, + merchant_account.storage_scheme, + ) + .await + { + Err(err) if err.current_context().is_db_not_found() => { + db.find_payment_method_by_locker_id( + key_manager_state, + key_store, + &payment_method_id, + merchant_account.storage_scheme, + ) + .await + } + Ok(pm) => Ok(pm), + Err(err) => Err(err), + } + } +} diff --git a/crates/router/Cargo.toml b/crates/router/Cargo.toml index 8ca5333847..2012028169 100644 --- a/crates/router/Cargo.toml +++ b/crates/router/Cargo.toml @@ -148,6 +148,7 @@ router_derive = { version = "0.1.0", path = "../router_derive" } router_env = { version = "0.1.0", path = "../router_env", features = ["log_extra_implicit_fields", "log_custom_entries_to_extra"] } scheduler = { version = "0.1.0", path = "../scheduler", default-features = false } storage_impl = { version = "0.1.0", path = "../storage_impl", default-features = false } +payment_methods = { version = "0.1.0", path = "../payment_methods", default-features = false } [build-dependencies] router_env = { version = "0.1.0", path = "../router_env", default-features = false } diff --git a/crates/router/src/core/customers.rs b/crates/router/src/core/customers.rs index 6699cf70db..e9d9ffe2d7 100644 --- a/crates/router/src/core/customers.rs +++ b/crates/router/src/core/customers.rs @@ -835,7 +835,10 @@ impl CustomerDeleteBridge for id_type::CustomerId { &pm.payment_method_id, ) .await - .switch()?; + .change_context(errors::CustomersErrorResponse::InternalServerError) + .attach_printable( + "failed to delete payment method while redacting customer details", + )?; } } Err(error) => { diff --git a/crates/router/src/core/payment_methods/tokenize/payment_method_executor.rs b/crates/router/src/core/payment_methods/tokenize/payment_method_executor.rs index 967dc77f7f..c9e2618744 100644 --- a/crates/router/src/core/payment_methods/tokenize/payment_method_executor.rs +++ b/crates/router/src/core/payment_methods/tokenize/payment_method_executor.rs @@ -282,7 +282,7 @@ impl CardNetworkTokenizeExecutor<'_, domain::TokenizePaymentMethodRequest> { ) .await .map_err(|err| match err.current_context() { - storage_impl::errors::StorageError::DatabaseError(err) + errors::DataStorageError::DatabaseError(err) if matches!( err.current_context(), diesel_models::errors::DatabaseError::NotFound @@ -292,7 +292,7 @@ impl CardNetworkTokenizeExecutor<'_, domain::TokenizePaymentMethodRequest> { message: "Invalid payment_method_id".into(), }) } - storage_impl::errors::StorageError::ValueNotFound(_) => { + errors::DataStorageError::ValueNotFound(_) => { report!(errors::ApiErrorResponse::InvalidRequestData { message: "Invalid payment_method_id".to_string(), }) diff --git a/crates/router/src/db.rs b/crates/router/src/db.rs index 70bc533f9a..48d48e380b 100644 --- a/crates/router/src/db.rs +++ b/crates/router/src/db.rs @@ -29,7 +29,6 @@ pub mod merchant_connector_account; pub mod merchant_key_store; pub mod organization; pub mod payment_link; -pub mod payment_method; pub mod payment_method_session; pub mod refund; pub mod relay; @@ -42,19 +41,21 @@ pub mod user_authentication_method; pub mod user_key_store; pub mod user_role; +use ::payment_methods::state::PaymentMethodsStorageInterface; use common_utils::id_type; use diesel_models::{ fraud_check::{FraudCheck, FraudCheckUpdate}, organization::{Organization, OrganizationNew, OrganizationUpdate}, }; use error_stack::ResultExt; -use hyperswitch_domain_models::payments::{ - payment_attempt::PaymentAttemptInterface, payment_intent::PaymentIntentInterface, -}; #[cfg(feature = "payouts")] use hyperswitch_domain_models::payouts::{ payout_attempt::PayoutAttemptInterface, payouts::PayoutsInterface, }; +use hyperswitch_domain_models::{ + payment_methods::PaymentMethodInterface, + payments::{payment_attempt::PaymentAttemptInterface, payment_intent::PaymentIntentInterface}, +}; #[cfg(not(feature = "payouts"))] use hyperswitch_domain_models::{PayoutAttemptInterface, PayoutsInterface}; use masking::PeekInterface; @@ -110,7 +111,7 @@ pub trait StorageInterface: + merchant_connector_account::MerchantConnectorAccountInterface + PaymentAttemptInterface + PaymentIntentInterface - + payment_method::PaymentMethodInterface + + PaymentMethodInterface + blocklist::BlocklistInterface + blocklist_fingerprint::BlocklistFingerprintInterface + dynamic_routing_stats::DynamicRoutingStatsInterface @@ -165,7 +166,10 @@ pub trait AccountsStorageInterface: } pub trait CommonStorageInterface: - StorageInterface + GlobalStorageInterface + AccountsStorageInterface + StorageInterface + + GlobalStorageInterface + + AccountsStorageInterface + + PaymentMethodsStorageInterface { fn get_storage_interface(&self) -> Box; fn get_global_storage_interface(&self) -> Box; diff --git a/crates/router/src/db/kafka_store.rs b/crates/router/src/db/kafka_store.rs index 84a90b2b99..7c2a9c7498 100644 --- a/crates/router/src/db/kafka_store.rs +++ b/crates/router/src/db/kafka_store.rs @@ -1,5 +1,6 @@ use std::sync::Arc; +use ::payment_methods::state::PaymentMethodsStorageInterface; use common_enums::enums::MerchantStorageScheme; use common_utils::{ errors::CustomResult, @@ -21,6 +22,7 @@ use hyperswitch_domain_models::payouts::{ }; use hyperswitch_domain_models::{ disputes, + payment_methods::PaymentMethodInterface, payments::{payment_attempt::PaymentAttemptInterface, payment_intent::PaymentIntentInterface}, refunds, }; @@ -75,7 +77,6 @@ use crate::{ merchant_connector_account::{ConnectorAccessToken, MerchantConnectorAccountInterface}, merchant_key_store::MerchantKeyStoreInterface, payment_link::PaymentLinkInterface, - payment_method::PaymentMethodInterface, refund::RefundInterface, reverse_lookup::ReverseLookupInterface, routing_algorithm::RoutingAlgorithmInterface, @@ -2050,7 +2051,7 @@ impl PaymentMethodInterface for KafkaStore { key_store: &domain::MerchantKeyStore, payment_method_id: &str, storage_scheme: MerchantStorageScheme, - ) -> CustomResult { + ) -> CustomResult { self.diesel_store .find_payment_method(state, key_store, payment_method_id, storage_scheme) .await @@ -2063,7 +2064,7 @@ impl PaymentMethodInterface for KafkaStore { key_store: &domain::MerchantKeyStore, payment_method_id: &id_type::GlobalPaymentMethodId, storage_scheme: MerchantStorageScheme, - ) -> CustomResult { + ) -> CustomResult { self.diesel_store .find_payment_method(state, key_store, payment_method_id, storage_scheme) .await @@ -2080,7 +2081,7 @@ impl PaymentMethodInterface for KafkaStore { customer_id: &id_type::CustomerId, merchant_id: &id_type::MerchantId, limit: Option, - ) -> CustomResult, errors::StorageError> { + ) -> CustomResult, errors::DataStorageError> { self.diesel_store .find_payment_method_by_customer_id_merchant_id_list( state, @@ -2099,7 +2100,7 @@ impl PaymentMethodInterface for KafkaStore { key_store: &domain::MerchantKeyStore, id: &id_type::GlobalCustomerId, limit: Option, - ) -> CustomResult, errors::StorageError> { + ) -> CustomResult, errors::DataStorageError> { self.diesel_store .find_payment_method_list_by_global_customer_id(state, key_store, id, limit) .await @@ -2118,7 +2119,7 @@ impl PaymentMethodInterface for KafkaStore { status: common_enums::PaymentMethodStatus, limit: Option, storage_scheme: MerchantStorageScheme, - ) -> CustomResult, errors::StorageError> { + ) -> CustomResult, errors::DataStorageError> { self.diesel_store .find_payment_method_by_customer_id_merchant_id_status( state, @@ -2142,7 +2143,7 @@ impl PaymentMethodInterface for KafkaStore { status: common_enums::PaymentMethodStatus, limit: Option, storage_scheme: MerchantStorageScheme, - ) -> CustomResult, errors::StorageError> { + ) -> CustomResult, errors::DataStorageError> { self.diesel_store .find_payment_method_by_global_customer_id_merchant_id_status( state, @@ -2165,7 +2166,7 @@ impl PaymentMethodInterface for KafkaStore { customer_id: &id_type::CustomerId, merchant_id: &id_type::MerchantId, status: common_enums::PaymentMethodStatus, - ) -> CustomResult { + ) -> CustomResult { self.diesel_store .get_payment_method_count_by_customer_id_merchant_id_status( customer_id, @@ -2185,7 +2186,7 @@ impl PaymentMethodInterface for KafkaStore { key_store: &domain::MerchantKeyStore, locker_id: &str, storage_scheme: MerchantStorageScheme, - ) -> CustomResult { + ) -> CustomResult { self.diesel_store .find_payment_method_by_locker_id(state, key_store, locker_id, storage_scheme) .await @@ -2197,7 +2198,7 @@ impl PaymentMethodInterface for KafkaStore { key_store: &domain::MerchantKeyStore, m: domain::PaymentMethod, storage_scheme: MerchantStorageScheme, - ) -> CustomResult { + ) -> CustomResult { self.diesel_store .insert_payment_method(state, key_store, m, storage_scheme) .await @@ -2210,7 +2211,7 @@ impl PaymentMethodInterface for KafkaStore { payment_method: domain::PaymentMethod, payment_method_update: storage::PaymentMethodUpdate, storage_scheme: MerchantStorageScheme, - ) -> CustomResult { + ) -> CustomResult { self.diesel_store .update_payment_method( state, @@ -2232,7 +2233,7 @@ impl PaymentMethodInterface for KafkaStore { key_store: &domain::MerchantKeyStore, merchant_id: &id_type::MerchantId, payment_method_id: &str, - ) -> CustomResult { + ) -> CustomResult { self.diesel_store .delete_payment_method_by_merchant_id_payment_method_id( state, @@ -2249,7 +2250,7 @@ impl PaymentMethodInterface for KafkaStore { state: &KeyManagerState, key_store: &domain::MerchantKeyStore, payment_method: domain::PaymentMethod, - ) -> CustomResult { + ) -> CustomResult { self.diesel_store .delete_payment_method(state, key_store, payment_method) .await @@ -2261,7 +2262,7 @@ impl PaymentMethodInterface for KafkaStore { state: &KeyManagerState, key_store: &domain::MerchantKeyStore, fingerprint_id: &str, - ) -> CustomResult { + ) -> CustomResult { self.diesel_store .find_payment_method_by_fingerprint_id(state, key_store, fingerprint_id) .await @@ -3180,6 +3181,8 @@ impl StorageInterface for KafkaStore { impl GlobalStorageInterface for KafkaStore {} impl AccountsStorageInterface for KafkaStore {} +impl PaymentMethodsStorageInterface for KafkaStore {} + impl CommonStorageInterface for KafkaStore { fn get_storage_interface(&self) -> Box { Box::new(self.clone()) diff --git a/crates/router/src/db/payment_method.rs b/crates/router/src/db/payment_method.rs deleted file mode 100644 index cc5af40b30..0000000000 --- a/crates/router/src/db/payment_method.rs +++ /dev/null @@ -1,1833 +0,0 @@ -use common_utils::{ext_traits::AsyncExt, id_type, types::keymanager::KeyManagerState}; -use diesel_models::payment_method::PaymentMethodUpdateInternal; -use error_stack::ResultExt; -use hyperswitch_domain_models::behaviour::{Conversion, ReverseConversion}; - -use super::MockDb; -use crate::{ - core::errors::{self, CustomResult}, - types::{ - domain, - storage::{self as storage_types, enums::MerchantStorageScheme}, - }, -}; - -#[async_trait::async_trait] -pub trait PaymentMethodInterface { - #[cfg(all( - any(feature = "v1", feature = "v2"), - not(feature = "payment_methods_v2") - ))] - async fn find_payment_method( - &self, - state: &KeyManagerState, - key_store: &domain::MerchantKeyStore, - payment_method_id: &str, - storage_scheme: MerchantStorageScheme, - ) -> CustomResult; - - #[cfg(all(feature = "v2", feature = "customer_v2"))] - async fn find_payment_method( - &self, - state: &KeyManagerState, - key_store: &domain::MerchantKeyStore, - payment_method_id: &id_type::GlobalPaymentMethodId, - storage_scheme: MerchantStorageScheme, - ) -> CustomResult; - - #[cfg(all( - any(feature = "v1", feature = "v2"), - not(feature = "payment_methods_v2") - ))] - async fn find_payment_method_by_locker_id( - &self, - state: &KeyManagerState, - key_store: &domain::MerchantKeyStore, - locker_id: &str, - storage_scheme: MerchantStorageScheme, - ) -> CustomResult; - - #[cfg(all( - any(feature = "v1", feature = "v2"), - not(feature = "payment_methods_v2") - ))] - async fn find_payment_method_by_customer_id_merchant_id_list( - &self, - state: &KeyManagerState, - key_store: &domain::MerchantKeyStore, - customer_id: &id_type::CustomerId, - merchant_id: &id_type::MerchantId, - limit: Option, - ) -> CustomResult, errors::StorageError>; - - // Need to fix this once we start moving to v2 for payment method - #[cfg(all(feature = "v2", feature = "customer_v2"))] - async fn find_payment_method_list_by_global_customer_id( - &self, - state: &KeyManagerState, - key_store: &domain::MerchantKeyStore, - id: &id_type::GlobalCustomerId, - limit: Option, - ) -> CustomResult, errors::StorageError>; - - #[cfg(all( - any(feature = "v1", feature = "v2"), - not(feature = "payment_methods_v2") - ))] - #[allow(clippy::too_many_arguments)] - async fn find_payment_method_by_customer_id_merchant_id_status( - &self, - state: &KeyManagerState, - key_store: &domain::MerchantKeyStore, - customer_id: &id_type::CustomerId, - merchant_id: &id_type::MerchantId, - status: common_enums::PaymentMethodStatus, - limit: Option, - storage_scheme: MerchantStorageScheme, - ) -> CustomResult, errors::StorageError>; - - #[cfg(all(feature = "v2", feature = "customer_v2"))] - #[allow(clippy::too_many_arguments)] - async fn find_payment_method_by_global_customer_id_merchant_id_status( - &self, - state: &KeyManagerState, - key_store: &domain::MerchantKeyStore, - customer_id: &id_type::GlobalCustomerId, - merchant_id: &id_type::MerchantId, - status: common_enums::PaymentMethodStatus, - limit: Option, - storage_scheme: MerchantStorageScheme, - ) -> CustomResult, errors::StorageError>; - - #[cfg(all( - any(feature = "v1", feature = "v2"), - not(feature = "payment_methods_v2") - ))] - async fn get_payment_method_count_by_customer_id_merchant_id_status( - &self, - customer_id: &id_type::CustomerId, - merchant_id: &id_type::MerchantId, - status: common_enums::PaymentMethodStatus, - ) -> CustomResult; - - async fn insert_payment_method( - &self, - state: &KeyManagerState, - key_store: &domain::MerchantKeyStore, - payment_method: domain::PaymentMethod, - storage_scheme: MerchantStorageScheme, - ) -> CustomResult; - - async fn update_payment_method( - &self, - state: &KeyManagerState, - key_store: &domain::MerchantKeyStore, - payment_method: domain::PaymentMethod, - payment_method_update: storage_types::PaymentMethodUpdate, - storage_scheme: MerchantStorageScheme, - ) -> CustomResult; - - #[cfg(all(feature = "v2", feature = "payment_methods_v2"))] - async fn delete_payment_method( - &self, - state: &KeyManagerState, - key_store: &domain::MerchantKeyStore, - payment_method: domain::PaymentMethod, - ) -> CustomResult; - - #[cfg(all(feature = "v2", feature = "payment_methods_v2"))] - async fn find_payment_method_by_fingerprint_id( - &self, - state: &KeyManagerState, - key_store: &domain::MerchantKeyStore, - fingerprint_id: &str, - ) -> CustomResult; - - #[cfg(all( - any(feature = "v1", feature = "v2"), - not(feature = "payment_methods_v2") - ))] - async fn delete_payment_method_by_merchant_id_payment_method_id( - &self, - state: &KeyManagerState, - key_store: &domain::MerchantKeyStore, - merchant_id: &id_type::MerchantId, - payment_method_id: &str, - ) -> CustomResult; -} - -#[cfg(feature = "kv_store")] -mod storage { - use common_utils::{ - fallback_reverse_lookup_not_found, id_type, types::keymanager::KeyManagerState, - }; - use diesel_models::{kv, PaymentMethodUpdateInternal}; - use error_stack::{report, ResultExt}; - use hyperswitch_domain_models::behaviour::{Conversion, ReverseConversion}; - use redis_interface::HsetnxReply; - use router_env::{instrument, tracing}; - use storage_impl::redis::kv_store::{ - decide_storage_scheme, kv_wrapper, KvOperation, Op, PartitionKey, - }; - - use super::PaymentMethodInterface; - use crate::{ - connection, - core::errors::{self, utils::RedisErrorExt, CustomResult}, - db::reverse_lookup::ReverseLookupInterface, - services::Store, - types::{ - domain, - storage::{self as storage_types, enums::MerchantStorageScheme}, - }, - utils::db_utils, - }; - - #[async_trait::async_trait] - impl PaymentMethodInterface for Store { - #[cfg(all( - any(feature = "v1", feature = "v2"), - not(feature = "payment_methods_v2") - ))] - #[instrument(skip_all)] - async fn find_payment_method( - &self, - state: &KeyManagerState, - key_store: &domain::MerchantKeyStore, - payment_method_id: &str, - storage_scheme: MerchantStorageScheme, - ) -> CustomResult { - let conn = connection::pg_connection_read(self).await?; - let database_call = || async { - storage_types::PaymentMethod::find_by_payment_method_id(&conn, payment_method_id) - .await - .map_err(|error| report!(errors::StorageError::from(error))) - }; - let storage_scheme = - Box::pin(decide_storage_scheme::<_, storage_types::PaymentMethod>( - self, - storage_scheme, - Op::Find, - )) - .await; - let get_pm = || async { - match storage_scheme { - MerchantStorageScheme::PostgresOnly => database_call().await, - MerchantStorageScheme::RedisKv => { - let lookup_id = format!("payment_method_{}", payment_method_id); - let lookup = fallback_reverse_lookup_not_found!( - self.get_lookup_by_lookup_id(&lookup_id, storage_scheme) - .await, - database_call().await - ); - - let key = PartitionKey::CombinationKey { - combination: &lookup.pk_id, - }; - - Box::pin(db_utils::try_redis_get_else_try_database_get( - async { - Box::pin(kv_wrapper( - self, - KvOperation::::HGet( - &lookup.sk_id, - ), - key, - )) - .await? - .try_into_hget() - }, - database_call, - )) - .await - } - } - }; - - get_pm() - .await? - .convert( - state, - key_store.key.get_inner(), - key_store.merchant_id.clone().into(), - ) - .await - .change_context(errors::StorageError::DecryptionError) - } - - #[cfg(all(feature = "v2", feature = "payment_methods_v2"))] - #[instrument(skip_all)] - async fn find_payment_method( - &self, - state: &KeyManagerState, - key_store: &domain::MerchantKeyStore, - payment_method_id: &id_type::GlobalPaymentMethodId, - storage_scheme: MerchantStorageScheme, - ) -> CustomResult { - let conn = connection::pg_connection_read(self).await?; - let database_call = || async { - storage_types::PaymentMethod::find_by_id(&conn, payment_method_id) - .await - .map_err(|error| report!(errors::StorageError::from(error))) - }; - let storage_scheme = - Box::pin(decide_storage_scheme::<_, storage_types::PaymentMethod>( - self, - storage_scheme, - Op::Find, - )) - .await; - let get_pm = || async { - match storage_scheme { - MerchantStorageScheme::PostgresOnly => database_call().await, - MerchantStorageScheme::RedisKv => { - let lookup_id = - format!("payment_method_{}", payment_method_id.get_string_repr()); - let lookup = fallback_reverse_lookup_not_found!( - self.get_lookup_by_lookup_id(&lookup_id, storage_scheme) - .await, - database_call().await - ); - - let key = PartitionKey::CombinationKey { - combination: &lookup.pk_id, - }; - - Box::pin(db_utils::try_redis_get_else_try_database_get( - async { - kv_wrapper( - self, - KvOperation::::HGet( - &lookup.sk_id, - ), - key, - ) - .await? - .try_into_hget() - }, - database_call, - )) - .await - } - } - }; - - get_pm() - .await? - .convert( - state, - key_store.key.get_inner(), - key_store.merchant_id.clone().into(), - ) - .await - .change_context(errors::StorageError::DecryptionError) - } - - #[cfg(all( - any(feature = "v1", feature = "v2"), - not(feature = "payment_methods_v2") - ))] - #[instrument(skip_all)] - async fn find_payment_method_by_locker_id( - &self, - state: &KeyManagerState, - key_store: &domain::MerchantKeyStore, - locker_id: &str, - storage_scheme: MerchantStorageScheme, - ) -> CustomResult { - let conn = connection::pg_connection_read(self).await?; - let database_call = || async { - storage_types::PaymentMethod::find_by_locker_id(&conn, locker_id) - .await - .map_err(|error| report!(errors::StorageError::from(error))) - }; - let storage_scheme = - Box::pin(decide_storage_scheme::<_, storage_types::PaymentMethod>( - self, - storage_scheme, - Op::Find, - )) - .await; - let get_pm = || async { - match storage_scheme { - MerchantStorageScheme::PostgresOnly => database_call().await, - MerchantStorageScheme::RedisKv => { - let lookup_id = format!("payment_method_locker_{}", locker_id); - let lookup = fallback_reverse_lookup_not_found!( - self.get_lookup_by_lookup_id(&lookup_id, storage_scheme) - .await, - database_call().await - ); - - let key = PartitionKey::CombinationKey { - combination: &lookup.pk_id, - }; - - Box::pin(db_utils::try_redis_get_else_try_database_get( - async { - Box::pin(kv_wrapper( - self, - KvOperation::::HGet( - &lookup.sk_id, - ), - key, - )) - .await? - .try_into_hget() - }, - database_call, - )) - .await - } - } - }; - - get_pm() - .await? - .convert( - state, - key_store.key.get_inner(), - key_store.merchant_id.clone().into(), - ) - .await - .change_context(errors::StorageError::DecryptionError) - } - - // not supported in kv - #[cfg(all( - any(feature = "v1", feature = "v2"), - not(feature = "payment_methods_v2") - ))] - #[instrument(skip_all)] - async fn get_payment_method_count_by_customer_id_merchant_id_status( - &self, - customer_id: &id_type::CustomerId, - merchant_id: &id_type::MerchantId, - status: common_enums::PaymentMethodStatus, - ) -> CustomResult { - let conn = connection::pg_connection_read(self).await?; - storage_types::PaymentMethod::get_count_by_customer_id_merchant_id_status( - &conn, - customer_id, - merchant_id, - status, - ) - .await - .map_err(|error| report!(errors::StorageError::from(error))) - } - - #[cfg(all(feature = "v2", feature = "payment_methods_v2"))] - #[instrument(skip_all)] - async fn insert_payment_method( - &self, - state: &KeyManagerState, - key_store: &domain::MerchantKeyStore, - payment_method: domain::PaymentMethod, - storage_scheme: MerchantStorageScheme, - ) -> CustomResult { - let payment_method_new = payment_method - .construct_new() - .await - .change_context(errors::StorageError::DecryptionError)?; - - let conn = connection::pg_connection_write(self).await?; - payment_method_new - .insert(&conn) - .await - .map_err(|error| report!(errors::StorageError::from(error)))? - .convert( - state, - key_store.key.get_inner(), - key_store.merchant_id.clone().into(), - ) - .await - .change_context(errors::StorageError::DecryptionError) - } - - #[cfg(all( - any(feature = "v1", feature = "v2"), - not(feature = "payment_methods_v2") - ))] - #[instrument(skip_all)] - async fn insert_payment_method( - &self, - state: &KeyManagerState, - key_store: &domain::MerchantKeyStore, - payment_method: domain::PaymentMethod, - storage_scheme: MerchantStorageScheme, - ) -> CustomResult { - let storage_scheme = - Box::pin(decide_storage_scheme::<_, storage_types::PaymentMethod>( - self, - storage_scheme, - Op::Insert, - )) - .await; - - let mut payment_method_new = payment_method - .construct_new() - .await - .change_context(errors::StorageError::DecryptionError)?; - - payment_method_new.update_storage_scheme(storage_scheme); - let pm = match storage_scheme { - MerchantStorageScheme::PostgresOnly => { - let conn = connection::pg_connection_write(self).await?; - payment_method_new - .insert(&conn) - .await - .map_err(|error| report!(errors::StorageError::from(error))) - } - MerchantStorageScheme::RedisKv => { - let merchant_id = payment_method_new.merchant_id.clone(); - let customer_id = payment_method_new.customer_id.clone(); - - let key = PartitionKey::MerchantIdCustomerId { - merchant_id: &merchant_id, - customer_id: &customer_id, - }; - let key_str = key.to_string(); - let field = format!("payment_method_id_{}", payment_method_new.get_id()); - - let reverse_lookup_entry = |v: String| diesel_models::ReverseLookupNew { - sk_id: field.clone(), - pk_id: key_str.clone(), - lookup_id: v, - source: "payment_method".to_string(), - updated_by: storage_scheme.to_string(), - }; - - let lookup_id1 = format!("payment_method_{}", payment_method_new.get_id()); - let mut reverse_lookups = vec![lookup_id1]; - if let Some(locker_id) = &payment_method_new.locker_id { - reverse_lookups.push(format!("payment_method_locker_{}", locker_id)) - } - - let results = reverse_lookups.into_iter().map(|v| { - self.insert_reverse_lookup(reverse_lookup_entry(v), storage_scheme) - }); - - futures::future::try_join_all(results).await?; - - let storage_payment_method = (&payment_method_new).into(); - - let redis_entry = kv::TypedSql { - op: kv::DBOperation::Insert { - insertable: Box::new(kv::Insertable::PaymentMethod(payment_method_new)), - }, - }; - - match Box::pin(kv_wrapper::( - self, - KvOperation::::HSetNx( - &field, - &storage_payment_method, - redis_entry, - ), - key, - )) - .await - .map_err(|err| err.to_redis_failed_response(&key_str))? - .try_into_hsetnx() - { - Ok(HsetnxReply::KeyNotSet) => Err(errors::StorageError::DuplicateValue { - entity: "payment_method", - key: Some(storage_payment_method.get_id().clone()), - } - .into()), - Ok(HsetnxReply::KeySet) => Ok(storage_payment_method), - Err(er) => Err(er).change_context(errors::StorageError::KVError), - } - } - }?; - - pm.convert( - state, - key_store.key.get_inner(), - key_store.merchant_id.clone().into(), - ) - .await - .change_context(errors::StorageError::DecryptionError) - } - - #[cfg(all( - any(feature = "v1", feature = "v2"), - not(feature = "payment_methods_v2") - ))] - #[instrument(skip_all)] - async fn update_payment_method( - &self, - state: &KeyManagerState, - key_store: &domain::MerchantKeyStore, - payment_method: domain::PaymentMethod, - payment_method_update: storage_types::PaymentMethodUpdate, - storage_scheme: MerchantStorageScheme, - ) -> CustomResult { - let payment_method = Conversion::convert(payment_method) - .await - .change_context(errors::StorageError::DecryptionError)?; - - let merchant_id = payment_method.merchant_id.clone(); - let customer_id = payment_method.customer_id.clone(); - let key = PartitionKey::MerchantIdCustomerId { - merchant_id: &merchant_id, - customer_id: &customer_id, - }; - let field = format!("payment_method_id_{}", payment_method.get_id()); - let storage_scheme = - Box::pin(decide_storage_scheme::<_, storage_types::PaymentMethod>( - self, - storage_scheme, - Op::Update(key.clone(), &field, payment_method.updated_by.as_deref()), - )) - .await; - let pm = match storage_scheme { - MerchantStorageScheme::PostgresOnly => { - let conn = connection::pg_connection_write(self).await?; - payment_method - .update_with_payment_method_id( - &conn, - payment_method_update.convert_to_payment_method_update(storage_scheme), - ) - .await - .map_err(|error| report!(errors::StorageError::from(error))) - } - MerchantStorageScheme::RedisKv => { - let key_str = key.to_string(); - - let p_update: PaymentMethodUpdateInternal = - payment_method_update.convert_to_payment_method_update(storage_scheme); - let updated_payment_method = - p_update.clone().apply_changeset(payment_method.clone()); - - let redis_value = serde_json::to_string(&updated_payment_method) - .change_context(errors::StorageError::SerializationFailed)?; - - let redis_entry = kv::TypedSql { - op: kv::DBOperation::Update { - updatable: Box::new(kv::Updateable::PaymentMethodUpdate(Box::new( - kv::PaymentMethodUpdateMems { - orig: payment_method, - update_data: p_update, - }, - ))), - }, - }; - - Box::pin(kv_wrapper::<(), _, _>( - self, - KvOperation::::Hset( - (&field, redis_value), - redis_entry, - ), - key, - )) - .await - .map_err(|err| err.to_redis_failed_response(&key_str))? - .try_into_hset() - .change_context(errors::StorageError::KVError)?; - - Ok(updated_payment_method) - } - }?; - - pm.convert( - state, - key_store.key.get_inner(), - key_store.merchant_id.clone().into(), - ) - .await - .change_context(errors::StorageError::DecryptionError) - } - - #[cfg(all(feature = "v2", feature = "payment_methods_v2"))] - #[instrument(skip_all)] - async fn update_payment_method( - &self, - state: &KeyManagerState, - key_store: &domain::MerchantKeyStore, - payment_method: domain::PaymentMethod, - payment_method_update: storage_types::PaymentMethodUpdate, - storage_scheme: MerchantStorageScheme, - ) -> CustomResult { - let payment_method = Conversion::convert(payment_method) - .await - .change_context(errors::StorageError::DecryptionError)?; - - let conn = connection::pg_connection_write(self).await?; - payment_method - .update_with_id(&conn, payment_method_update.into()) - .await - .map_err(|error| report!(errors::StorageError::from(error)))? - .convert( - state, - key_store.key.get_inner(), - key_store.merchant_id.clone().into(), - ) - .await - .change_context(errors::StorageError::DecryptionError) - } - - #[cfg(all( - any(feature = "v1", feature = "v2"), - not(feature = "payment_methods_v2") - ))] - #[instrument(skip_all)] - async fn find_payment_method_by_customer_id_merchant_id_list( - &self, - state: &KeyManagerState, - key_store: &domain::MerchantKeyStore, - customer_id: &id_type::CustomerId, - merchant_id: &id_type::MerchantId, - limit: Option, - ) -> CustomResult, errors::StorageError> { - let conn = connection::pg_connection_read(self).await?; - let payment_methods = storage_types::PaymentMethod::find_by_customer_id_merchant_id( - &conn, - customer_id, - merchant_id, - limit, - ) - .await - .map_err(|error| report!(errors::StorageError::from(error)))?; - - let pm_futures = payment_methods - .into_iter() - .map(|pm| async { - pm.convert( - state, - key_store.key.get_inner(), - key_store.merchant_id.clone().into(), - ) - .await - .change_context(errors::StorageError::DecryptionError) - }) - .collect::>(); - - let domain_payment_methods = futures::future::try_join_all(pm_futures).await?; - - Ok(domain_payment_methods) - } - - // Need to fix this once we start moving to v2 for payment method - #[cfg(all( - feature = "v2", - feature = "customer_v2", - feature = "payment_methods_v2" - ))] - async fn find_payment_method_list_by_global_customer_id( - &self, - state: &KeyManagerState, - key_store: &domain::MerchantKeyStore, - customer_id: &id_type::GlobalCustomerId, - limit: Option, - ) -> CustomResult, errors::StorageError> { - let conn = connection::pg_connection_read(self).await?; - let payment_methods = - storage_types::PaymentMethod::find_by_global_customer_id(&conn, customer_id, limit) - .await - .map_err(|error| report!(errors::StorageError::from(error)))?; - - let pm_futures = payment_methods - .into_iter() - .map(|pm| async { - pm.convert( - state, - key_store.key.get_inner(), - key_store.merchant_id.clone().into(), - ) - .await - .change_context(errors::StorageError::DecryptionError) - }) - .collect::>(); - - let domain_payment_methods = futures::future::try_join_all(pm_futures).await?; - - Ok(domain_payment_methods) - } - - #[cfg(all( - any(feature = "v1", feature = "v2"), - not(feature = "payment_methods_v2") - ))] - #[instrument(skip_all)] - async fn find_payment_method_by_customer_id_merchant_id_status( - &self, - state: &KeyManagerState, - key_store: &domain::MerchantKeyStore, - customer_id: &id_type::CustomerId, - merchant_id: &id_type::MerchantId, - status: common_enums::PaymentMethodStatus, - limit: Option, - storage_scheme: MerchantStorageScheme, - ) -> CustomResult, errors::StorageError> { - let conn = connection::pg_connection_read(self).await?; - let database_call = || async { - storage_types::PaymentMethod::find_by_customer_id_merchant_id_status( - &conn, - customer_id, - merchant_id, - status, - limit, - ) - .await - .map_err(|error| report!(errors::StorageError::from(error))) - }; - - let payment_methods = match storage_scheme { - MerchantStorageScheme::PostgresOnly => database_call().await, - MerchantStorageScheme::RedisKv => { - let key = PartitionKey::MerchantIdCustomerId { - merchant_id, - customer_id, - }; - - let pattern = "payment_method_id_*"; - - let redis_fut = async { - let kv_result = Box::pin(kv_wrapper::( - self, - KvOperation::::Scan(pattern), - key, - )) - .await? - .try_into_scan(); - kv_result.map(|payment_methods| { - payment_methods - .into_iter() - .filter(|pm| pm.status == status) - .collect() - }) - }; - - Box::pin(db_utils::find_all_combined_kv_database( - redis_fut, - database_call, - limit, - )) - .await - } - }?; - - let pm_futures = payment_methods - .into_iter() - .map(|pm| async { - pm.convert( - state, - key_store.key.get_inner(), - key_store.merchant_id.clone().into(), - ) - .await - .change_context(errors::StorageError::DecryptionError) - }) - .collect::>(); - - let domain_payment_methods = futures::future::try_join_all(pm_futures).await?; - - Ok(domain_payment_methods) - } - - #[cfg(all(feature = "v2", feature = "customer_v2"))] - #[instrument(skip_all)] - async fn find_payment_method_by_global_customer_id_merchant_id_status( - &self, - state: &KeyManagerState, - key_store: &domain::MerchantKeyStore, - customer_id: &id_type::GlobalCustomerId, - merchant_id: &id_type::MerchantId, - status: common_enums::PaymentMethodStatus, - limit: Option, - _storage_scheme: MerchantStorageScheme, - ) -> CustomResult, errors::StorageError> { - let conn = connection::pg_connection_read(self).await?; - let payment_methods = - storage_types::PaymentMethod::find_by_global_customer_id_merchant_id_status( - &conn, - customer_id, - merchant_id, - status, - limit, - ) - .await - .map_err(|error| report!(errors::StorageError::from(error)))?; - - let pm_futures = payment_methods - .into_iter() - .map(|pm| async { - pm.convert( - state, - key_store.key.get_inner(), - key_store.merchant_id.clone().into(), - ) - .await - .change_context(errors::StorageError::DecryptionError) - }) - .collect::>(); - - let domain_payment_methods = futures::future::try_join_all(pm_futures).await?; - - Ok(domain_payment_methods) - } - - #[cfg(all( - any(feature = "v1", feature = "v2"), - not(feature = "payment_methods_v2") - ))] - async fn delete_payment_method_by_merchant_id_payment_method_id( - &self, - state: &KeyManagerState, - key_store: &domain::MerchantKeyStore, - merchant_id: &id_type::MerchantId, - payment_method_id: &str, - ) -> CustomResult { - let conn = connection::pg_connection_write(self).await?; - storage_types::PaymentMethod::delete_by_merchant_id_payment_method_id( - &conn, - merchant_id, - payment_method_id, - ) - .await - .map_err(|error| report!(errors::StorageError::from(error)))? - .convert( - state, - key_store.key.get_inner(), - key_store.merchant_id.clone().into(), - ) - .await - .change_context(errors::StorageError::DecryptionError) - } - - // Soft delete, Check if KV stuff is needed here - #[cfg(all(feature = "v2", feature = "payment_methods_v2"))] - async fn delete_payment_method( - &self, - state: &KeyManagerState, - key_store: &domain::MerchantKeyStore, - payment_method: domain::PaymentMethod, - ) -> CustomResult { - let payment_method = Conversion::convert(payment_method) - .await - .change_context(errors::StorageError::DecryptionError)?; - let conn = connection::pg_connection_write(self).await?; - let payment_method_update = storage_types::PaymentMethodUpdate::StatusUpdate { - status: Some(common_enums::PaymentMethodStatus::Inactive), - }; - payment_method - .update_with_id(&conn, payment_method_update.into()) - .await - .map_err(|error| report!(errors::StorageError::from(error)))? - .convert( - state, - key_store.key.get_inner(), - key_store.merchant_id.clone().into(), - ) - .await - .change_context(errors::StorageError::DecryptionError) - } - - // Check if KV stuff is needed here - #[cfg(all(feature = "v2", feature = "payment_methods_v2"))] - async fn find_payment_method_by_fingerprint_id( - &self, - state: &KeyManagerState, - key_store: &domain::MerchantKeyStore, - fingerprint_id: &str, - ) -> CustomResult { - let conn = connection::pg_connection_read(self).await?; - storage_types::PaymentMethod::find_by_fingerprint_id(&conn, fingerprint_id) - .await - .map_err(|error| report!(errors::StorageError::from(error)))? - .convert( - state, - key_store.key.get_inner(), - key_store.merchant_id.clone().into(), - ) - .await - .change_context(errors::StorageError::DecryptionError) - } - } -} - -#[cfg(not(feature = "kv_store"))] -mod storage { - use common_utils::{id_type, types::keymanager::KeyManagerState}; - use error_stack::{report, ResultExt}; - use hyperswitch_domain_models::behaviour::{Conversion, ReverseConversion}; - use router_env::{instrument, tracing}; - - use super::PaymentMethodInterface; - use crate::{ - connection, - core::errors::{self, CustomResult}, - services::Store, - types::{ - domain, - storage::{self as storage_types, enums::MerchantStorageScheme}, - }, - }; - - #[async_trait::async_trait] - impl PaymentMethodInterface for Store { - #[instrument(skip_all)] - #[cfg(all( - any(feature = "v1", feature = "v2"), - not(feature = "payment_methods_v2") - ))] - async fn find_payment_method( - &self, - state: &KeyManagerState, - key_store: &domain::MerchantKeyStore, - payment_method_id: &str, - _storage_scheme: MerchantStorageScheme, - ) -> CustomResult { - let conn = connection::pg_connection_read(self).await?; - storage_types::PaymentMethod::find_by_payment_method_id(&conn, payment_method_id) - .await - .map_err(|error| report!(errors::StorageError::from(error)))? - .convert( - state, - key_store.key.get_inner(), - key_store.merchant_id.clone().into(), - ) - .await - .change_context(errors::StorageError::DecryptionError) - } - - #[cfg(all(feature = "v2", feature = "payment_methods_v2"))] - async fn find_payment_method( - &self, - state: &KeyManagerState, - key_store: &domain::MerchantKeyStore, - payment_method_id: &id_type::GlobalPaymentMethodId, - _storage_scheme: MerchantStorageScheme, - ) -> CustomResult { - let conn = connection::pg_connection_read(self).await?; - storage_types::PaymentMethod::find_by_id(&conn, payment_method_id) - .await - .map_err(|error| report!(errors::StorageError::from(error)))? - .convert( - state, - key_store.key.get_inner(), - key_store.merchant_id.clone().into(), - ) - .await - .change_context(errors::StorageError::DecryptionError) - } - - #[cfg(all( - any(feature = "v1", feature = "v2"), - not(feature = "payment_methods_v2") - ))] - #[instrument(skip_all)] - async fn find_payment_method_by_locker_id( - &self, - state: &KeyManagerState, - key_store: &domain::MerchantKeyStore, - locker_id: &str, - _storage_scheme: MerchantStorageScheme, - ) -> CustomResult { - let conn = connection::pg_connection_read(self).await?; - storage_types::PaymentMethod::find_by_locker_id(&conn, locker_id) - .await - .map_err(|error| report!(errors::StorageError::from(error)))? - .convert( - state, - key_store.key.get_inner(), - key_store.merchant_id.clone().into(), - ) - .await - .change_context(errors::StorageError::DecryptionError) - } - - #[cfg(all( - any(feature = "v1", feature = "v2"), - not(feature = "payment_methods_v2") - ))] - #[instrument(skip_all)] - async fn get_payment_method_count_by_customer_id_merchant_id_status( - &self, - customer_id: &id_type::CustomerId, - merchant_id: &id_type::MerchantId, - status: common_enums::PaymentMethodStatus, - ) -> CustomResult { - let conn = connection::pg_connection_read(self).await?; - storage_types::PaymentMethod::get_count_by_customer_id_merchant_id_status( - &conn, - customer_id, - merchant_id, - status, - ) - .await - .map_err(|error| report!(errors::StorageError::from(error))) - } - - #[instrument(skip_all)] - async fn insert_payment_method( - &self, - state: &KeyManagerState, - key_store: &domain::MerchantKeyStore, - payment_method: domain::PaymentMethod, - _storage_scheme: MerchantStorageScheme, - ) -> CustomResult { - let payment_method_new = payment_method - .construct_new() - .await - .change_context(errors::StorageError::DecryptionError)?; - - let conn = connection::pg_connection_write(self).await?; - payment_method_new - .insert(&conn) - .await - .map_err(|error| report!(errors::StorageError::from(error)))? - .convert( - state, - key_store.key.get_inner(), - key_store.merchant_id.clone().into(), - ) - .await - .change_context(errors::StorageError::DecryptionError) - } - - #[cfg(all( - any(feature = "v1", feature = "v2"), - not(feature = "payment_methods_v2") - ))] - #[instrument(skip_all)] - async fn update_payment_method( - &self, - state: &KeyManagerState, - key_store: &domain::MerchantKeyStore, - payment_method: domain::PaymentMethod, - payment_method_update: storage_types::PaymentMethodUpdate, - _storage_scheme: MerchantStorageScheme, - ) -> CustomResult { - let payment_method = Conversion::convert(payment_method) - .await - .change_context(errors::StorageError::DecryptionError)?; - - let conn = connection::pg_connection_write(self).await?; - payment_method - .update_with_payment_method_id(&conn, payment_method_update.into()) - .await - .map_err(|error| report!(errors::StorageError::from(error)))? - .convert( - state, - key_store.key.get_inner(), - key_store.merchant_id.clone().into(), - ) - .await - .change_context(errors::StorageError::DecryptionError) - } - - #[cfg(all(feature = "v2", feature = "payment_methods_v2"))] - #[instrument(skip_all)] - async fn update_payment_method( - &self, - state: &KeyManagerState, - key_store: &domain::MerchantKeyStore, - payment_method: domain::PaymentMethod, - payment_method_update: storage_types::PaymentMethodUpdate, - _storage_scheme: MerchantStorageScheme, - ) -> CustomResult { - let payment_method = payment_method - .convert( - state, - key_store.key.get_inner(), - key_store.merchant_id.clone().into(), - ) - .await - .change_context(errors::StorageError::DecryptionError)?; - - let conn = connection::pg_connection_write(self).await?; - payment_method - .update_with_id(&conn, payment_method_update.into()) - .await - .map_err(|error| report!(errors::StorageError::from(error)))? - .convert( - state, - key_store.key.get_inner(), - key_store.merchant_id.clone().into(), - ) - .await - .change_context(errors::StorageError::DecryptionError) - } - - #[cfg(all( - any(feature = "v1", feature = "v2"), - not(feature = "payment_methods_v2") - ))] - #[instrument(skip_all)] - async fn find_payment_method_by_customer_id_merchant_id_list( - &self, - state: &KeyManagerState, - key_store: &domain::MerchantKeyStore, - customer_id: &id_type::CustomerId, - merchant_id: &id_type::MerchantId, - limit: Option, - ) -> CustomResult, errors::StorageError> { - let conn = connection::pg_connection_read(self).await?; - let payment_methods = storage_types::PaymentMethod::find_by_customer_id_merchant_id( - &conn, - customer_id, - merchant_id, - limit, - ) - .await - .map_err(|error| report!(errors::StorageError::from(error)))?; - - let pm_futures = payment_methods - .into_iter() - .map(|pm| async { - pm.convert( - state, - key_store.key.get_inner(), - key_store.merchant_id.clone().into(), - ) - .await - .change_context(errors::StorageError::DecryptionError) - }) - .collect::>(); - - let domain_payment_methods = futures::future::try_join_all(pm_futures).await?; - - Ok(domain_payment_methods) - } - - // Need to fix this once we move to payment method for customer - #[cfg(all(feature = "v2", feature = "customer_v2"))] - #[instrument(skip_all)] - async fn find_payment_method_list_by_global_customer_id( - &self, - state: &KeyManagerState, - key_store: &domain::MerchantKeyStore, - id: &id_type::GlobalCustomerId, - limit: Option, - ) -> CustomResult, errors::StorageError> { - let conn = connection::pg_connection_read(self).await?; - let payment_methods = - storage_types::PaymentMethod::find_by_global_customer_id(&conn, customer_id, limit) - .await - .map_err(|error| report!(errors::StorageError::from(error)))?; - - let pm_futures = payment_methods - .into_iter() - .map(|pm| async { - pm.convert( - state, - key_store.key.get_inner(), - key_store.merchant_id.clone().into(), - ) - .await - .change_context(errors::StorageError::DecryptionError) - }) - .collect::>(); - - let domain_payment_methods = futures::future::try_join_all(pm_futures).await?; - - Ok(domain_payment_methods) - } - - #[cfg(all( - any(feature = "v1", feature = "v2"), - not(feature = "payment_methods_v2") - ))] - #[instrument(skip_all)] - async fn find_payment_method_by_customer_id_merchant_id_status( - &self, - state: &KeyManagerState, - key_store: &domain::MerchantKeyStore, - customer_id: &id_type::CustomerId, - merchant_id: &id_type::MerchantId, - status: common_enums::PaymentMethodStatus, - limit: Option, - _storage_scheme: MerchantStorageScheme, - ) -> CustomResult, errors::StorageError> { - let conn = connection::pg_connection_read(self).await?; - let payment_methods = - storage_types::PaymentMethod::find_by_customer_id_merchant_id_status( - &conn, - customer_id, - merchant_id, - status, - limit, - ) - .await - .map_err(|error| report!(errors::StorageError::from(error)))?; - - let pm_futures = payment_methods - .into_iter() - .map(|pm| async { - pm.convert( - state, - key_store.key.get_inner(), - key_store.merchant_id.clone().into(), - ) - .await - .change_context(errors::StorageError::DecryptionError) - }) - .collect::>(); - - let domain_payment_methods = futures::future::try_join_all(pm_futures).await?; - - Ok(domain_payment_methods) - } - - #[cfg(all(feature = "v2", feature = "customer_v2"))] - #[instrument(skip_all)] - async fn find_payment_method_by_global_customer_id_merchant_id_status( - &self, - state: &KeyManagerState, - key_store: &domain::MerchantKeyStore, - customer_id: &id_type::GlobalCustomerId, - merchant_id: &id_type::MerchantId, - status: common_enums::PaymentMethodStatus, - limit: Option, - _storage_scheme: MerchantStorageScheme, - ) -> CustomResult, errors::StorageError> { - let conn = connection::pg_connection_read(self).await?; - let payment_methods = - storage_types::PaymentMethod::find_by_global_customer_id_merchant_id_status( - &conn, - customer_id, - merchant_id, - status, - limit, - ) - .await - .map_err(|error| report!(errors::StorageError::from(error)))?; - - let pm_futures = payment_methods - .into_iter() - .map(|pm| async { - pm.convert( - state, - key_store.key.get_inner(), - key_store.merchant_id.clone().into(), - ) - .await - .change_context(errors::StorageError::DecryptionError) - }) - .collect::>(); - - let domain_payment_methods = futures::future::try_join_all(pm_futures).await?; - - Ok(domain_payment_methods) - } - - #[cfg(all( - any(feature = "v1", feature = "v2"), - not(feature = "payment_methods_v2") - ))] - async fn delete_payment_method_by_merchant_id_payment_method_id( - &self, - state: &KeyManagerState, - key_store: &domain::MerchantKeyStore, - merchant_id: &id_type::MerchantId, - payment_method_id: &str, - ) -> CustomResult { - let conn = connection::pg_connection_write(self).await?; - storage_types::PaymentMethod::delete_by_merchant_id_payment_method_id( - &conn, - merchant_id, - payment_method_id, - ) - .await - .map_err(|error| report!(errors::StorageError::from(error)))? - .convert( - state, - key_store.key.get_inner(), - key_store.merchant_id.clone().into(), - ) - .await - .change_context(errors::StorageError::DecryptionError) - } - - #[cfg(all(feature = "v2", feature = "payment_methods_v2"))] - async fn delete_payment_method( - &self, - state: &KeyManagerState, - key_store: &domain::MerchantKeyStore, - payment_method: domain::PaymentMethod, - ) -> CustomResult { - let payment_method = Conversion::convert(payment_method) - .await - .change_context(errors::StorageError::DecryptionError)?; - let conn = connection::pg_connection_write(self).await?; - let payment_method_update = storage_types::PaymentMethodUpdate::StatusUpdate { - status: Some(common_enums::PaymentMethodStatus::Inactive), - }; - payment_method - .update_with_id(&conn, payment_method_update.into()) - .await - .map_err(|error| report!(errors::StorageError::from(error)))? - .convert( - state, - key_store.key.get_inner(), - key_store.merchant_id.clone().into(), - ) - .await - .change_context(errors::StorageError::DecryptionError) - } - - #[cfg(all(feature = "v2", feature = "payment_methods_v2"))] - async fn find_payment_method_by_fingerprint_id( - &self, - state: &KeyManagerState, - key_store: &domain::MerchantKeyStore, - fingerprint_id: &str, - ) -> CustomResult { - let conn = connection::pg_connection_read(self).await?; - storage_types::PaymentMethod::find_by_fingerprint_id(&conn, fingerprint_id) - .await - .map_err(|error| report!(errors::StorageError::from(error)))? - .convert( - state, - key_store.key.get_inner(), - key_store.merchant_id.clone().into(), - ) - .await - .change_context(errors::StorageError::DecryptionError) - } - } -} - -#[async_trait::async_trait] -impl PaymentMethodInterface for MockDb { - #[cfg(all( - any(feature = "v1", feature = "v2"), - not(feature = "payment_methods_v2") - ))] - async fn find_payment_method( - &self, - state: &KeyManagerState, - key_store: &domain::MerchantKeyStore, - payment_method_id: &str, - _storage_scheme: MerchantStorageScheme, - ) -> CustomResult { - let payment_methods = self.payment_methods.lock().await; - let payment_method = payment_methods - .iter() - .find(|pm| pm.get_id() == payment_method_id) - .cloned(); - - match payment_method { - Some(pm) => Ok(pm - .convert( - state, - key_store.key.get_inner(), - key_store.merchant_id.clone().into(), - ) - .await - .change_context(errors::StorageError::DecryptionError)?), - None => Err(errors::StorageError::ValueNotFound( - "cannot find payment method".to_string(), - ) - .into()), - } - } - - #[cfg(all(feature = "v2", feature = "payment_methods_v2"))] - async fn find_payment_method( - &self, - state: &KeyManagerState, - key_store: &domain::MerchantKeyStore, - payment_method_id: &id_type::GlobalPaymentMethodId, - _storage_scheme: MerchantStorageScheme, - ) -> CustomResult { - let payment_methods = self.payment_methods.lock().await; - let payment_method = payment_methods - .iter() - .find(|pm| pm.get_id() == payment_method_id) - .cloned(); - - match payment_method { - Some(pm) => Ok(pm - .convert( - state, - key_store.key.get_inner(), - key_store.merchant_id.clone().into(), - ) - .await - .change_context(errors::StorageError::DecryptionError)?), - None => Err(errors::StorageError::ValueNotFound( - "cannot find payment method".to_string(), - ) - .into()), - } - } - - #[cfg(all( - any(feature = "v1", feature = "v2"), - not(feature = "payment_methods_v2") - ))] - async fn find_payment_method_by_locker_id( - &self, - state: &KeyManagerState, - key_store: &domain::MerchantKeyStore, - locker_id: &str, - _storage_scheme: MerchantStorageScheme, - ) -> CustomResult { - let payment_methods = self.payment_methods.lock().await; - let payment_method = payment_methods - .iter() - .find(|pm| pm.locker_id == Some(locker_id.to_string())) - .cloned(); - - match payment_method { - Some(pm) => Ok(pm - .convert( - state, - key_store.key.get_inner(), - key_store.merchant_id.clone().into(), - ) - .await - .change_context(errors::StorageError::DecryptionError)?), - None => Err(errors::StorageError::ValueNotFound( - "cannot find payment method".to_string(), - ) - .into()), - } - } - - #[cfg(all( - any(feature = "v1", feature = "v2"), - not(feature = "payment_methods_v2") - ))] - async fn get_payment_method_count_by_customer_id_merchant_id_status( - &self, - customer_id: &id_type::CustomerId, - merchant_id: &id_type::MerchantId, - status: common_enums::PaymentMethodStatus, - ) -> CustomResult { - let payment_methods = self.payment_methods.lock().await; - let count = payment_methods - .iter() - .filter(|pm| { - pm.customer_id == *customer_id - && pm.merchant_id == *merchant_id - && pm.status == status - }) - .count(); - i64::try_from(count).change_context(errors::StorageError::MockDbError) - } - - async fn insert_payment_method( - &self, - _state: &KeyManagerState, - _key_store: &domain::MerchantKeyStore, - payment_method: domain::PaymentMethod, - _storage_scheme: MerchantStorageScheme, - ) -> CustomResult { - let mut payment_methods = self.payment_methods.lock().await; - - let pm = Conversion::convert(payment_method.clone()) - .await - .change_context(errors::StorageError::DecryptionError)?; - - payment_methods.push(pm); - Ok(payment_method) - } - - #[cfg(all( - any(feature = "v1", feature = "v2"), - not(feature = "payment_methods_v2") - ))] - async fn find_payment_method_by_customer_id_merchant_id_list( - &self, - state: &KeyManagerState, - key_store: &domain::MerchantKeyStore, - customer_id: &id_type::CustomerId, - merchant_id: &id_type::MerchantId, - _limit: Option, - ) -> CustomResult, errors::StorageError> { - let payment_methods = self.payment_methods.lock().await; - let payment_methods_found: Vec = payment_methods - .iter() - .filter(|pm| pm.customer_id == *customer_id && pm.merchant_id == *merchant_id) - .cloned() - .collect(); - - if payment_methods_found.is_empty() { - Err( - errors::StorageError::ValueNotFound("cannot find payment method".to_string()) - .into(), - ) - } else { - let pm_futures = payment_methods_found - .into_iter() - .map(|pm| async { - pm.convert( - state, - key_store.key.get_inner(), - key_store.merchant_id.clone().into(), - ) - .await - .change_context(errors::StorageError::DecryptionError) - }) - .collect::>(); - - let domain_payment_methods = futures::future::try_join_all(pm_futures).await?; - - Ok(domain_payment_methods) - } - } - - // Need to fix this once we complete v2 payment method - #[cfg(all(feature = "v2", feature = "customer_v2"))] - async fn find_payment_method_list_by_global_customer_id( - &self, - state: &KeyManagerState, - key_store: &domain::MerchantKeyStore, - _id: &id_type::GlobalCustomerId, - _limit: Option, - ) -> CustomResult, errors::StorageError> { - todo!() - } - - #[cfg(all( - any(feature = "v1", feature = "v2"), - not(feature = "payment_methods_v2") - ))] - async fn find_payment_method_by_customer_id_merchant_id_status( - &self, - state: &KeyManagerState, - key_store: &domain::MerchantKeyStore, - customer_id: &id_type::CustomerId, - merchant_id: &id_type::MerchantId, - status: common_enums::PaymentMethodStatus, - _limit: Option, - _storage_scheme: MerchantStorageScheme, - ) -> CustomResult, errors::StorageError> { - let payment_methods = self.payment_methods.lock().await; - let payment_methods_found: Vec = payment_methods - .iter() - .filter(|pm| { - pm.customer_id == *customer_id - && pm.merchant_id == *merchant_id - && pm.status == status - }) - .cloned() - .collect(); - - if payment_methods_found.is_empty() { - Err( - errors::StorageError::ValueNotFound("cannot find payment methods".to_string()) - .into(), - ) - } else { - let pm_futures = payment_methods_found - .into_iter() - .map(|pm| async { - pm.convert( - state, - key_store.key.get_inner(), - key_store.merchant_id.clone().into(), - ) - .await - .change_context(errors::StorageError::DecryptionError) - }) - .collect::>(); - - let domain_payment_methods = futures::future::try_join_all(pm_futures).await?; - - Ok(domain_payment_methods) - } - } - - #[cfg(all(feature = "v2", feature = "customer_v2"))] - async fn find_payment_method_by_global_customer_id_merchant_id_status( - &self, - state: &KeyManagerState, - key_store: &domain::MerchantKeyStore, - customer_id: &id_type::GlobalCustomerId, - merchant_id: &id_type::MerchantId, - status: common_enums::PaymentMethodStatus, - _limit: Option, - _storage_scheme: MerchantStorageScheme, - ) -> CustomResult, errors::StorageError> { - let payment_methods = self.payment_methods.lock().await; - let payment_methods_found: Vec = payment_methods - .iter() - .filter(|pm| { - pm.customer_id == *customer_id - && pm.merchant_id == *merchant_id - && pm.status == status - }) - .cloned() - .collect(); - - if payment_methods_found.is_empty() { - Err( - errors::StorageError::ValueNotFound("cannot find payment methods".to_string()) - .into(), - ) - } else { - let pm_futures = payment_methods_found - .into_iter() - .map(|pm| async { - pm.convert( - state, - key_store.key.get_inner(), - key_store.merchant_id.clone().into(), - ) - .await - .change_context(errors::StorageError::DecryptionError) - }) - .collect::>(); - - let domain_payment_methods = futures::future::try_join_all(pm_futures).await?; - - Ok(domain_payment_methods) - } - } - - #[cfg(all( - any(feature = "v1", feature = "v2"), - not(feature = "payment_methods_v2") - ))] - async fn delete_payment_method_by_merchant_id_payment_method_id( - &self, - state: &KeyManagerState, - key_store: &domain::MerchantKeyStore, - merchant_id: &id_type::MerchantId, - payment_method_id: &str, - ) -> CustomResult { - let mut payment_methods = self.payment_methods.lock().await; - match payment_methods - .iter() - .position(|pm| pm.merchant_id == *merchant_id && pm.get_id() == payment_method_id) - { - Some(index) => { - let deleted_payment_method = payment_methods.remove(index); - Ok(deleted_payment_method - .convert( - state, - key_store.key.get_inner(), - key_store.merchant_id.clone().into(), - ) - .await - .change_context(errors::StorageError::DecryptionError)?) - } - None => Err(errors::StorageError::ValueNotFound( - "cannot find payment method to delete".to_string(), - ) - .into()), - } - } - - async fn update_payment_method( - &self, - state: &KeyManagerState, - key_store: &domain::MerchantKeyStore, - payment_method: domain::PaymentMethod, - payment_method_update: storage_types::PaymentMethodUpdate, - _storage_scheme: MerchantStorageScheme, - ) -> CustomResult { - self.payment_methods - .lock() - .await - .iter_mut() - .find(|pm| pm.get_id() == payment_method.get_id()) - .async_map(|pm| async { - let payment_method_updated = - PaymentMethodUpdateInternal::from(payment_method_update).apply_changeset( - Conversion::convert(payment_method) - .await - .change_context(errors::StorageError::EncryptionError)?, - ); - - *pm = payment_method_updated.clone(); - - payment_method_updated - .convert( - state, - key_store.key.get_inner(), - key_store.merchant_id.clone().into(), - ) - .await - .change_context(errors::StorageError::DecryptionError) - }) - .await - .transpose()? - .ok_or( - errors::StorageError::ValueNotFound( - "cannot find payment method to update".to_string(), - ) - .into(), - ) - } - - #[cfg(all(feature = "v2", feature = "payment_methods_v2"))] - async fn delete_payment_method( - &self, - state: &KeyManagerState, - key_store: &domain::MerchantKeyStore, - payment_method: domain::PaymentMethod, - ) -> CustomResult { - let payment_method_update = storage_types::PaymentMethodUpdate::StatusUpdate { - status: Some(common_enums::PaymentMethodStatus::Inactive), - }; - - self.payment_methods - .lock() - .await - .iter_mut() - .find(|pm| pm.get_id() == payment_method.get_id()) - .async_map(|pm| async { - let payment_method_updated = - PaymentMethodUpdateInternal::from(payment_method_update).apply_changeset( - Conversion::convert(payment_method) - .await - .change_context(errors::StorageError::EncryptionError)?, - ); - - *pm = payment_method_updated.clone(); - - payment_method_updated - .convert( - state, - key_store.key.get_inner(), - key_store.merchant_id.clone().into(), - ) - .await - .change_context(errors::StorageError::DecryptionError) - }) - .await - .transpose()? - .ok_or( - errors::StorageError::ValueNotFound( - "cannot find payment method to update".to_string(), - ) - .into(), - ) - } - - #[cfg(all(feature = "v2", feature = "payment_methods_v2"))] - async fn find_payment_method_by_fingerprint_id( - &self, - state: &KeyManagerState, - key_store: &domain::MerchantKeyStore, - fingerprint_id: &str, - ) -> CustomResult { - let payment_methods = self.payment_methods.lock().await; - let payment_method = payment_methods - .iter() - .find(|pm| pm.locker_fingerprint_id == Some(fingerprint_id.to_string())) - .cloned(); - - match payment_method { - Some(pm) => Ok(pm - .convert( - state, - key_store.key.get_inner(), - key_store.merchant_id.clone().into(), - ) - .await - .change_context(errors::StorageError::DecryptionError)?), - None => Err(errors::StorageError::ValueNotFound( - "cannot find payment method".to_string(), - ) - .into()), - } - } -} diff --git a/crates/router/src/routes/admin.rs b/crates/router/src/routes/admin.rs index 876fa0bf25..a38a9df72b 100644 --- a/crates/router/src/routes/admin.rs +++ b/crates/router/src/routes/admin.rs @@ -289,7 +289,7 @@ pub async fn delete_merchant_account( let mid = mid.into_inner(); let payload = web::Json(admin::MerchantId { merchant_id: mid }).into_inner(); - api::server_wrap( + Box::pin(api::server_wrap( flow, state, &req, @@ -297,7 +297,7 @@ pub async fn delete_merchant_account( |state, _, req, _| merchant_account_delete(state, req.merchant_id), &auth::AdminApiAuth, api_locking::LockAction::NotApplicable, - ) + )) .await } diff --git a/crates/router/src/routes/api_keys.rs b/crates/router/src/routes/api_keys.rs index a96521c3ac..8c0b713766 100644 --- a/crates/router/src/routes/api_keys.rs +++ b/crates/router/src/routes/api_keys.rs @@ -190,7 +190,7 @@ pub async fn api_key_update( let mut payload = json_payload.into_inner(); payload.key_id = api_key_id; - api::server_wrap( + Box::pin(api::server_wrap( flow, state, &req, @@ -212,7 +212,7 @@ pub async fn api_key_update( req.headers(), ), api_locking::LockAction::NotApplicable, - ) + )) .await } @@ -229,7 +229,7 @@ pub async fn api_key_revoke( let flow = Flow::ApiKeyRevoke; let (merchant_id, key_id) = path.into_inner(); - api::server_wrap( + Box::pin(api::server_wrap( flow, state, &req, @@ -244,7 +244,7 @@ pub async fn api_key_revoke( req.headers(), ), api_locking::LockAction::NotApplicable, - ) + )) .await } @@ -261,7 +261,7 @@ pub async fn api_key_revoke( let flow = Flow::ApiKeyRevoke; let (merchant_id, key_id) = path.into_inner(); - api::server_wrap( + Box::pin(api::server_wrap( flow, state, &req, @@ -276,7 +276,7 @@ pub async fn api_key_revoke( req.headers(), ), api_locking::LockAction::NotApplicable, - ) + )) .await } @@ -294,7 +294,7 @@ pub async fn api_key_list( let offset = list_api_key_constraints.skip; let merchant_id = path.into_inner(); - api::server_wrap( + Box::pin(api::server_wrap( flow, state, &req, @@ -311,7 +311,7 @@ pub async fn api_key_list( req.headers(), ), api_locking::LockAction::NotApplicable, - ) + )) .await } #[cfg(feature = "v2")] @@ -324,7 +324,7 @@ pub async fn api_key_list( let flow = Flow::ApiKeyList; let payload = query.into_inner(); - api::server_wrap( + Box::pin(api::server_wrap( flow, state, &req, @@ -346,6 +346,6 @@ pub async fn api_key_list( req.headers(), ), api_locking::LockAction::NotApplicable, - ) + )) .await } diff --git a/crates/router/src/routes/app.rs b/crates/router/src/routes/app.rs index de5593703d..7bb42134aa 100644 --- a/crates/router/src/routes/app.rs +++ b/crates/router/src/routes/app.rs @@ -484,8 +484,9 @@ impl AppState { let tenant_conf = self.conf.multitenancy.get_tenant(tenant).ok_or_else(err)?; let mut event_handler = self.event_handler.clone(); event_handler.add_tenant(tenant_conf); + let store = self.stores.get(tenant).ok_or_else(err)?.clone(); Ok(SessionState { - store: self.stores.get(tenant).ok_or_else(err)?.clone(), + store, global_store: self.global_store.clone(), accounts_store: self.accounts_store.get(tenant).ok_or_else(err)?.clone(), conf: Arc::clone(&self.conf), diff --git a/crates/router/src/routes/customers.rs b/crates/router/src/routes/customers.rs index 20f53c17aa..c793e3c85e 100644 --- a/crates/router/src/routes/customers.rs +++ b/crates/router/src/routes/customers.rs @@ -125,7 +125,7 @@ pub async fn customers_list( let flow = Flow::CustomersList; let payload = query.into_inner(); - api::server_wrap( + Box::pin(api::server_wrap( flow, state, &req, @@ -147,7 +147,7 @@ pub async fn customers_list( req.headers(), ), api_locking::LockAction::NotApplicable, - ) + )) .await } diff --git a/crates/router/src/routes/dummy_connector.rs b/crates/router/src/routes/dummy_connector.rs index 709c76486c..5d425c675a 100644 --- a/crates/router/src/routes/dummy_connector.rs +++ b/crates/router/src/routes/dummy_connector.rs @@ -115,7 +115,7 @@ pub async fn dummy_connector_refund( let flow = types::Flow::DummyRefundCreate; let mut payload = json_payload.into_inner(); payload.payment_id = Some(path.into_inner()); - api::server_wrap( + Box::pin(api::server_wrap( flow, state, &req, @@ -123,7 +123,7 @@ pub async fn dummy_connector_refund( |state, _: (), req, _| core::refund_payment(state, req), &auth::NoAuth, api_locking::LockAction::NotApplicable, - ) + )) .await } diff --git a/crates/router/src/services.rs b/crates/router/src/services.rs index 57f9fadcbc..39d825114b 100644 --- a/crates/router/src/services.rs +++ b/crates/router/src/services.rs @@ -25,7 +25,7 @@ pub use hyperswitch_interfaces::connector_integration_v2::{ }; use masking::{ExposeInterface, StrongSecret}; #[cfg(feature = "kv_store")] -use storage_impl::KVRouterStore; +use storage_impl::kv_router_store::KVRouterStore; use storage_impl::{config::TenantConfig, redis::RedisStore, RouterStore}; use tokio::sync::oneshot; diff --git a/crates/router/src/utils/db_utils.rs b/crates/router/src/utils/db_utils.rs index 07ec03f4de..80cf5a8331 100644 --- a/crates/router/src/utils/db_utils.rs +++ b/crates/router/src/utils/db_utils.rs @@ -36,73 +36,3 @@ where }, } } - -pub async fn find_all_combined_kv_database( - redis_fut: RFut, - database_call: F, - limit: Option, -) -> error_stack::Result, errors::StorageError> -where - T: UniqueConstraints, - F: FnOnce() -> DFut, - RFut: - futures::Future, redis_interface::errors::RedisError>>, - DFut: futures::Future, errors::StorageError>>, -{ - let trunc = |v: &mut Vec<_>| { - if let Some(l) = limit.and_then(|v| TryInto::try_into(v).ok()) { - v.truncate(l); - } - }; - - let limit_satisfies = |len: usize, limit: i64| { - TryInto::try_into(limit) - .ok() - .map_or(true, |val: usize| len >= val) - }; - - let redis_output = redis_fut.await; - match (redis_output, limit) { - (Ok(mut kv_rows), Some(lim)) if limit_satisfies(kv_rows.len(), lim) => { - trunc(&mut kv_rows); - Ok(kv_rows) - } - (Ok(kv_rows), _) => database_call().await.map(|db_rows| { - let mut res = union_vec(kv_rows, db_rows); - trunc(&mut res); - res - }), - (Err(redis_error), _) => match redis_error.current_context() { - redis_interface::errors::RedisError::NotFound => { - metrics::KV_MISS.add(1, &[]); - database_call().await - } - // Keeping the key empty here since the error would never go here. - _ => Err(redis_error.to_redis_failed_response("")), - }, - } -} - -use std::collections::HashSet; - -use storage_impl::UniqueConstraints; - -fn union_vec(mut kv_rows: Vec, sql_rows: Vec) -> Vec -where - T: UniqueConstraints, -{ - let mut kv_unique_keys = HashSet::new(); - - kv_rows.iter().for_each(|v| { - kv_unique_keys.insert(v.unique_constraints().concat()); - }); - - sql_rows.into_iter().for_each(|v| { - let unique_key = v.unique_constraints().concat(); - if !kv_unique_keys.contains(&unique_key) { - kv_rows.push(v); - } - }); - - kv_rows -} diff --git a/crates/router/src/workflows/payment_method_status_update.rs b/crates/router/src/workflows/payment_method_status_update.rs index dba3bace25..1e568c04c7 100644 --- a/crates/router/src/workflows/payment_method_status_update.rs +++ b/crates/router/src/workflows/payment_method_status_update.rs @@ -1,4 +1,5 @@ use common_utils::ext_traits::ValueExt; +use error_stack::ResultExt; use scheduler::{ consumer::types::process_data, utils as pt_utils, workflows::ProcessTrackerWorkflow, }; @@ -9,7 +10,6 @@ use crate::{ routes::SessionState, types::storage::{self, PaymentMethodStatusTrackingData}, }; - pub struct PaymentMethodStatusUpdateWorkflow; #[async_trait::async_trait] @@ -55,7 +55,9 @@ impl ProcessTrackerWorkflow for PaymentMethodStatusUpdateWorkflow &pm_id, merchant_account.storage_scheme, ) - .await?; + .await + .change_context(errors::ApiErrorResponse::InternalServerError) + .attach_printable("Unable to decode billing address")?; if payment_method.status != prev_pm_status { return db @@ -78,7 +80,8 @@ impl ProcessTrackerWorkflow for PaymentMethodStatusUpdateWorkflow merchant_account.storage_scheme, ) .await - .map_err(errors::ProcessTrackerError::EStorageError); + .change_context(errors::ApiErrorResponse::InternalServerError) + .attach_printable("Unable to update payment method"); if let Ok(_pm) = res { db.as_scheduler() diff --git a/crates/scheduler/src/scheduler.rs b/crates/scheduler/src/scheduler.rs index 2685c6311e..7a5681147a 100644 --- a/crates/scheduler/src/scheduler.rs +++ b/crates/scheduler/src/scheduler.rs @@ -1,9 +1,9 @@ use std::sync::Arc; use common_utils::{errors::CustomResult, id_type}; -use storage_impl::mock_db::MockDb; #[cfg(feature = "kv_store")] -use storage_impl::KVRouterStore; +use storage_impl::kv_router_store::KVRouterStore; +use storage_impl::mock_db::MockDb; #[cfg(not(feature = "kv_store"))] use storage_impl::RouterStore; use tokio::sync::mpsc; diff --git a/crates/storage_impl/src/kv_router_store.rs b/crates/storage_impl/src/kv_router_store.rs new file mode 100644 index 0000000000..0a7e46e3de --- /dev/null +++ b/crates/storage_impl/src/kv_router_store.rs @@ -0,0 +1,462 @@ +use std::{fmt::Debug, sync::Arc}; + +use common_enums::enums::MerchantStorageScheme; +use common_utils::{fallback_reverse_lookup_not_found, types::keymanager::KeyManagerState}; +use diesel_models::{errors::DatabaseError, kv}; +use error_stack::ResultExt; +use hyperswitch_domain_models::{ + behaviour::{Conversion, ReverseConversion}, + errors::{self, StorageResult}, + merchant_key_store::MerchantKeyStore, +}; +#[cfg(not(feature = "payouts"))] +use hyperswitch_domain_models::{PayoutAttemptInterface, PayoutsInterface}; +use masking::StrongSecret; +use redis_interface::{errors::RedisError, types::HsetnxReply, RedisConnectionPool}; +use router_env::logger; +use serde::de; + +#[cfg(not(feature = "payouts"))] +pub use crate::database::store::Store; +use crate::{ + config::TenantConfig, + database::store::PgPool, + diesel_error_to_data_error, + errors::RedisErrorExt, + lookup::ReverseLookupInterface, + metrics, + redis::kv_store::{ + decide_storage_scheme, kv_wrapper, KvOperation, KvStorePartition, Op, PartitionKey, + RedisConnInterface, + }, + utils::{find_all_combined_kv_database, try_redis_get_else_try_database_get}, + RouterStore, UniqueConstraints, +}; +pub use crate::{database::store::DatabaseStore, mock_db::MockDb}; + +#[derive(Debug, Clone)] +pub struct KVRouterStore { + pub router_store: RouterStore, + drainer_stream_name: String, + drainer_num_partitions: u8, + pub ttl_for_kv: u32, + pub request_id: Option, + pub soft_kill_mode: bool, +} + +pub struct InsertResourceParams<'a> { + pub insertable: kv::Insertable, + pub reverse_lookups: Vec, + pub key: PartitionKey<'a>, + // secondary key + pub identifier: String, + // type of resource Eg: "payment_attempt" + pub resource_type: &'static str, +} + +pub struct UpdateResourceParams<'a> { + pub updateable: kv::Updateable, + pub operation: Op<'a>, +} + +pub struct FilterResourceParams<'a> { + pub key: PartitionKey<'a>, + pub pattern: &'static str, + pub limit: Option, +} + +#[async_trait::async_trait] +impl DatabaseStore for KVRouterStore +where + RouterStore: DatabaseStore, + T: DatabaseStore, +{ + type Config = (RouterStore, String, u8, u32, Option); + async fn new( + config: Self::Config, + tenant_config: &dyn TenantConfig, + _test_transaction: bool, + ) -> StorageResult { + let (router_store, _, drainer_num_partitions, ttl_for_kv, soft_kill_mode) = config; + let drainer_stream_name = format!("{}_{}", tenant_config.get_schema(), config.1); + Ok(Self::from_store( + router_store, + drainer_stream_name, + drainer_num_partitions, + ttl_for_kv, + soft_kill_mode, + )) + } + fn get_master_pool(&self) -> &PgPool { + self.router_store.get_master_pool() + } + fn get_replica_pool(&self) -> &PgPool { + self.router_store.get_replica_pool() + } + + fn get_accounts_master_pool(&self) -> &PgPool { + self.router_store.get_accounts_master_pool() + } + + fn get_accounts_replica_pool(&self) -> &PgPool { + self.router_store.get_accounts_replica_pool() + } +} + +impl RedisConnInterface for KVRouterStore { + fn get_redis_conn(&self) -> error_stack::Result, RedisError> { + self.router_store.get_redis_conn() + } +} + +impl KVRouterStore { + pub fn from_store( + store: RouterStore, + drainer_stream_name: String, + drainer_num_partitions: u8, + ttl_for_kv: u32, + soft_kill: Option, + ) -> Self { + let request_id = store.request_id.clone(); + + Self { + router_store: store, + drainer_stream_name, + drainer_num_partitions, + ttl_for_kv, + request_id, + soft_kill_mode: soft_kill.unwrap_or(false), + } + } + + pub fn master_key(&self) -> &StrongSecret> { + self.router_store.master_key() + } + + pub fn get_drainer_stream_name(&self, shard_key: &str) -> String { + format!("{{{}}}_{}", shard_key, self.drainer_stream_name) + } + + pub async fn push_to_drainer_stream( + &self, + redis_entry: kv::TypedSql, + partition_key: PartitionKey<'_>, + ) -> error_stack::Result<(), RedisError> + where + R: KvStorePartition, + { + let global_id = format!("{}", partition_key); + let request_id = self.request_id.clone().unwrap_or_default(); + + let shard_key = R::shard_key(partition_key, self.drainer_num_partitions); + let stream_name = self.get_drainer_stream_name(&shard_key); + self.router_store + .cache_store + .redis_conn + .stream_append_entry( + &stream_name.into(), + &redis_interface::RedisEntryId::AutoGeneratedID, + redis_entry + .to_field_value_pairs(request_id, global_id) + .change_context(RedisError::JsonSerializationFailed)?, + ) + .await + .map(|_| metrics::KV_PUSHED_TO_DRAINER.add(1, &[])) + .inspect_err(|error| { + metrics::KV_FAILED_TO_PUSH_TO_DRAINER.add(1, &[]); + logger::error!(?error, "Failed to add entry in drainer stream"); + }) + .change_context(RedisError::StreamAppendFailed) + } + + pub async fn find_resource_by_id( + &self, + state: &KeyManagerState, + key_store: &MerchantKeyStore, + storage_scheme: MerchantStorageScheme, + find_resource_db_fut: R, + lookup_id: String, + ) -> error_stack::Result + where + D: Debug + Sync + Conversion, + M: de::DeserializeOwned + + serde::Serialize + + Debug + + KvStorePartition + + UniqueConstraints + + Sync + + ReverseConversion, + R: futures::Future> + Send, + { + let database_call = || async { + find_resource_db_fut.await.map_err(|error| { + let new_err = diesel_error_to_data_error(*error.current_context()); + error.change_context(new_err) + }) + }; + let storage_scheme = Box::pin(decide_storage_scheme::( + self, + storage_scheme, + Op::Find, + )) + .await; + let res = || async { + match storage_scheme { + MerchantStorageScheme::PostgresOnly => database_call().await, + MerchantStorageScheme::RedisKv => { + let lookup = fallback_reverse_lookup_not_found!( + self.get_lookup_by_lookup_id(&lookup_id, storage_scheme) + .await, + database_call().await + ); + + let key = PartitionKey::CombinationKey { + combination: &lookup.pk_id, + }; + + Box::pin(try_redis_get_else_try_database_get( + async { + Box::pin(kv_wrapper(self, KvOperation::::HGet(&lookup.sk_id), key)) + .await? + .try_into_hget() + }, + database_call, + )) + .await + } + } + }; + res() + .await? + .convert( + state, + key_store.key.get_inner(), + key_store.merchant_id.clone().into(), + ) + .await + .change_context(errors::StorageError::DecryptionError) + } + + pub async fn insert_resource( + &self, + state: &KeyManagerState, + key_store: &MerchantKeyStore, + storage_scheme: MerchantStorageScheme, + create_resource_fut: R, + resource_new: M, + InsertResourceParams { + insertable, + reverse_lookups, + key, + identifier, + resource_type, + }: InsertResourceParams<'_>, + ) -> error_stack::Result + where + D: Debug + Sync + Conversion, + M: de::DeserializeOwned + + serde::Serialize + + Debug + + KvStorePartition + + UniqueConstraints + + Sync + + ReverseConversion, + R: futures::Future> + Send, + { + let storage_scheme = Box::pin(decide_storage_scheme::<_, M>( + self, + storage_scheme, + Op::Insert, + )) + .await; + match storage_scheme { + MerchantStorageScheme::PostgresOnly => create_resource_fut.await.map_err(|error| { + let new_err = diesel_error_to_data_error(*error.current_context()); + error.change_context(new_err) + }), + MerchantStorageScheme::RedisKv => { + let key_str = key.to_string(); + let reverse_lookup_entry = |v: String| diesel_models::ReverseLookupNew { + sk_id: identifier.clone(), + pk_id: key_str.clone(), + lookup_id: v, + source: resource_type.to_string(), + updated_by: storage_scheme.to_string(), + }; + let results = reverse_lookups + .into_iter() + .map(|v| self.insert_reverse_lookup(reverse_lookup_entry(v), storage_scheme)); + + futures::future::try_join_all(results).await?; + + let redis_entry = kv::TypedSql { + op: kv::DBOperation::Insert { + insertable: Box::new(insertable), + }, + }; + match Box::pin(kv_wrapper::( + self, + KvOperation::::HSetNx(&identifier, &resource_new, redis_entry), + key.clone(), + )) + .await + .map_err(|err| err.to_redis_failed_response(&key.to_string()))? + .try_into_hsetnx() + { + Ok(HsetnxReply::KeyNotSet) => Err(errors::StorageError::DuplicateValue { + entity: resource_type, + key: Some(key_str), + } + .into()), + Ok(HsetnxReply::KeySet) => Ok(resource_new), + Err(er) => Err(er).change_context(errors::StorageError::KVError), + } + } + }? + .convert( + state, + key_store.key.get_inner(), + key_store.merchant_id.clone().into(), + ) + .await + .change_context(errors::StorageError::DecryptionError) + } + + pub async fn update_resource( + &self, + state: &KeyManagerState, + key_store: &MerchantKeyStore, + storage_scheme: MerchantStorageScheme, + update_resource_fut: R, + updated_resource: M, + UpdateResourceParams { + updateable, + operation, + }: UpdateResourceParams<'_>, + ) -> error_stack::Result + where + D: Debug + Sync + Conversion, + M: de::DeserializeOwned + + serde::Serialize + + Debug + + KvStorePartition + + UniqueConstraints + + Sync + + ReverseConversion, + R: futures::Future> + Send, + { + match operation { + Op::Update(key, field, updated_by) => { + let storage_scheme = Box::pin(decide_storage_scheme::<_, M>( + self, + storage_scheme, + Op::Update(key.clone(), field, updated_by), + )) + .await; + match storage_scheme { + MerchantStorageScheme::PostgresOnly => { + update_resource_fut.await.map_err(|error| { + let new_err = diesel_error_to_data_error(*error.current_context()); + error.change_context(new_err) + }) + } + MerchantStorageScheme::RedisKv => { + let key_str = key.to_string(); + let redis_value = serde_json::to_string(&updated_resource) + .change_context(errors::StorageError::SerializationFailed)?; + + let redis_entry = kv::TypedSql { + op: kv::DBOperation::Update { + updatable: Box::new(updateable), + }, + }; + Box::pin(kv_wrapper::<(), _, _>( + self, + KvOperation::::Hset((field, redis_value), redis_entry), + key, + )) + .await + .map_err(|err| err.to_redis_failed_response(&key_str))? + .try_into_hset() + .change_context(errors::StorageError::KVError)?; + Ok(updated_resource) + } + } + } + _ => Err(errors::StorageError::KVError.into()), + }? + .convert( + state, + key_store.key.get_inner(), + key_store.merchant_id.clone().into(), + ) + .await + .change_context(errors::StorageError::DecryptionError) + } + pub async fn filter_resources( + &self, + state: &KeyManagerState, + key_store: &MerchantKeyStore, + storage_scheme: MerchantStorageScheme, + filter_resource_db_fut: R, + filter_fn: impl Fn(&M) -> bool, + FilterResourceParams { + key, + pattern, + limit, + }: FilterResourceParams<'_>, + ) -> error_stack::Result, errors::StorageError> + where + D: Debug + Sync + Conversion, + M: de::DeserializeOwned + + serde::Serialize + + Debug + + KvStorePartition + + UniqueConstraints + + Sync + + ReverseConversion, + R: futures::Future, DatabaseError>> + Send, + { + let db_call = || async { + filter_resource_db_fut.await.map_err(|error| { + let new_err = diesel_error_to_data_error(*error.current_context()); + error.change_context(new_err) + }) + }; + let resources = match storage_scheme { + MerchantStorageScheme::PostgresOnly => db_call().await, + MerchantStorageScheme::RedisKv => { + let redis_fut = async { + let kv_result = Box::pin(kv_wrapper::( + self, + KvOperation::::Scan(pattern), + key, + )) + .await? + .try_into_scan(); + kv_result.map(|records| records.into_iter().filter(filter_fn).collect()) + }; + + Box::pin(find_all_combined_kv_database(redis_fut, db_call, limit)).await + } + }?; + let resource_futures = resources + .into_iter() + .map(|pm| async { + pm.convert( + state, + key_store.key.get_inner(), + key_store.merchant_id.clone().into(), + ) + .await + .change_context(errors::StorageError::DecryptionError) + }) + .collect::>(); + futures::future::try_join_all(resource_futures).await + } +} + +#[cfg(not(feature = "payouts"))] +impl PayoutAttemptInterface for KVRouterStore {} +#[cfg(not(feature = "payouts"))] +impl PayoutsInterface for KVRouterStore {} diff --git a/crates/storage_impl/src/lib.rs b/crates/storage_impl/src/lib.rs index 137cd2a5d4..13b4671e86 100644 --- a/crates/storage_impl/src/lib.rs +++ b/crates/storage_impl/src/lib.rs @@ -1,8 +1,12 @@ -use std::sync::Arc; +use std::{fmt::Debug, sync::Arc}; use diesel_models as store; use error_stack::ResultExt; -use hyperswitch_domain_models::errors::{StorageError, StorageResult}; +use hyperswitch_domain_models::{ + behaviour::{Conversion, ReverseConversion}, + errors::{StorageError, StorageResult}, + merchant_key_store::MerchantKeyStore, +}; use masking::StrongSecret; use redis::{kv_store::RedisConnInterface, pub_sub::PubSubInterface, RedisStore}; mod address; @@ -12,7 +16,8 @@ pub mod connection; pub mod customers; pub mod database; pub mod errors; -mod lookup; +pub mod kv_router_store; +pub mod lookup; pub mod mandate; pub mod metrics; pub mod mock_db; @@ -23,15 +28,14 @@ pub mod payouts; pub mod redis; pub mod refund; mod reverse_lookup; -mod utils; +pub mod utils; -use common_utils::errors::CustomResult; +use common_utils::{errors::CustomResult, types::keymanager::KeyManagerState}; use database::store::PgPool; #[cfg(not(feature = "payouts"))] use hyperswitch_domain_models::{PayoutAttemptInterface, PayoutsInterface}; pub use mock_db::MockDb; use redis_interface::{errors::RedisError, RedisConnectionPool, SaddReply}; -use router_env::logger; pub use crate::database::store::DatabaseStore; #[cfg(not(feature = "payouts"))] @@ -149,6 +153,70 @@ impl RouterStore { &self.master_encryption_key } + pub async fn call_database( + &self, + state: &KeyManagerState, + key_store: &MerchantKeyStore, + execute_query: R, + ) -> error_stack::Result + where + D: Debug + Sync + Conversion, + R: futures::Future> + + Send, + M: ReverseConversion, + { + execute_query + .await + .map_err(|error| { + let new_err = diesel_error_to_data_error(*error.current_context()); + error.change_context(new_err) + })? + .convert( + state, + key_store.key.get_inner(), + key_store.merchant_id.clone().into(), + ) + .await + .change_context(StorageError::DecryptionError) + } + + pub async fn find_resources( + &self, + state: &KeyManagerState, + key_store: &MerchantKeyStore, + execute_query: R, + ) -> error_stack::Result, StorageError> + where + D: Debug + Sync + Conversion, + R: futures::Future< + Output = error_stack::Result, diesel_models::errors::DatabaseError>, + > + Send, + M: ReverseConversion, + { + let resource_futures = execute_query + .await + .map_err(|error| { + let new_err = diesel_error_to_data_error(*error.current_context()); + error.change_context(new_err) + })? + .into_iter() + .map(|resource| async { + resource + .convert( + state, + key_store.key.get_inner(), + key_store.merchant_id.clone().into(), + ) + .await + .change_context(StorageError::DecryptionError) + }) + .collect::>(); + + let resources = futures::future::try_join_all(resource_futures).await?; + + Ok(resources) + } + /// # Panics /// /// Will panic if `CONNECTOR_AUTH_FILE_PATH` is not set @@ -173,121 +241,6 @@ impl RouterStore { } } -#[derive(Debug, Clone)] -pub struct KVRouterStore { - router_store: RouterStore, - drainer_stream_name: String, - drainer_num_partitions: u8, - ttl_for_kv: u32, - pub request_id: Option, - soft_kill_mode: bool, -} - -#[async_trait::async_trait] -impl DatabaseStore for KVRouterStore -where - RouterStore: DatabaseStore, - T: DatabaseStore, -{ - type Config = (RouterStore, String, u8, u32, Option); - async fn new( - config: Self::Config, - tenant_config: &dyn config::TenantConfig, - _test_transaction: bool, - ) -> StorageResult { - let (router_store, _, drainer_num_partitions, ttl_for_kv, soft_kill_mode) = config; - let drainer_stream_name = format!("{}_{}", tenant_config.get_schema(), config.1); - Ok(Self::from_store( - router_store, - drainer_stream_name, - drainer_num_partitions, - ttl_for_kv, - soft_kill_mode, - )) - } - fn get_master_pool(&self) -> &PgPool { - self.router_store.get_master_pool() - } - fn get_replica_pool(&self) -> &PgPool { - self.router_store.get_replica_pool() - } - - fn get_accounts_master_pool(&self) -> &PgPool { - self.router_store.get_accounts_master_pool() - } - - fn get_accounts_replica_pool(&self) -> &PgPool { - self.router_store.get_accounts_replica_pool() - } -} - -impl RedisConnInterface for KVRouterStore { - fn get_redis_conn(&self) -> error_stack::Result, RedisError> { - self.router_store.get_redis_conn() - } -} - -impl KVRouterStore { - pub fn from_store( - store: RouterStore, - drainer_stream_name: String, - drainer_num_partitions: u8, - ttl_for_kv: u32, - soft_kill: Option, - ) -> Self { - let request_id = store.request_id.clone(); - - Self { - router_store: store, - drainer_stream_name, - drainer_num_partitions, - ttl_for_kv, - request_id, - soft_kill_mode: soft_kill.unwrap_or(false), - } - } - - pub fn master_key(&self) -> &StrongSecret> { - self.router_store.master_key() - } - - pub fn get_drainer_stream_name(&self, shard_key: &str) -> String { - format!("{{{}}}_{}", shard_key, self.drainer_stream_name) - } - - pub async fn push_to_drainer_stream( - &self, - redis_entry: diesel_models::kv::TypedSql, - partition_key: redis::kv_store::PartitionKey<'_>, - ) -> error_stack::Result<(), RedisError> - where - R: redis::kv_store::KvStorePartition, - { - let global_id = format!("{}", partition_key); - let request_id = self.request_id.clone().unwrap_or_default(); - - let shard_key = R::shard_key(partition_key, self.drainer_num_partitions); - let stream_name = self.get_drainer_stream_name(&shard_key); - self.router_store - .cache_store - .redis_conn - .stream_append_entry( - &stream_name.into(), - &redis_interface::RedisEntryId::AutoGeneratedID, - redis_entry - .to_field_value_pairs(request_id, global_id) - .change_context(RedisError::JsonSerializationFailed)?, - ) - .await - .map(|_| metrics::KV_PUSHED_TO_DRAINER.add(1, &[])) - .inspect_err(|error| { - metrics::KV_FAILED_TO_PUSH_TO_DRAINER.add(1, &[]); - logger::error!(?error, "Failed to add entry in drainer stream"); - }) - .change_context(RedisError::StreamAppendFailed) - } -} - // TODO: This should not be used beyond this crate // Remove the pub modified once StorageScheme usage is completed pub trait DataModelExt { @@ -503,11 +456,7 @@ impl UniqueConstraints for diesel_models::Customer { } } -#[cfg(not(feature = "payouts"))] -impl PayoutAttemptInterface for KVRouterStore {} #[cfg(not(feature = "payouts"))] impl PayoutAttemptInterface for RouterStore {} #[cfg(not(feature = "payouts"))] -impl PayoutsInterface for KVRouterStore {} -#[cfg(not(feature = "payouts"))] impl PayoutsInterface for RouterStore {} diff --git a/crates/storage_impl/src/lookup.rs b/crates/storage_impl/src/lookup.rs index 2f8a743a81..2ea9cfd5c8 100644 --- a/crates/storage_impl/src/lookup.rs +++ b/crates/storage_impl/src/lookup.rs @@ -12,9 +12,10 @@ use redis_interface::SetnxReply; use crate::{ diesel_error_to_data_error, errors::RedisErrorExt, + kv_router_store::KVRouterStore, redis::kv_store::{decide_storage_scheme, kv_wrapper, KvOperation, Op, PartitionKey}, utils::{self, try_redis_get_else_try_database_get}, - DatabaseStore, KVRouterStore, RouterStore, + DatabaseStore, RouterStore, }; #[async_trait::async_trait] diff --git a/crates/storage_impl/src/mock_db.rs b/crates/storage_impl/src/mock_db.rs index b81676a6cc..90d246da15 100644 --- a/crates/storage_impl/src/mock_db.rs +++ b/crates/storage_impl/src/mock_db.rs @@ -1,10 +1,13 @@ use std::sync::Arc; +use common_utils::{errors::CustomResult, types::keymanager::KeyManagerState}; use diesel_models as store; use error_stack::ResultExt; -use futures::lock::Mutex; +use futures::lock::{Mutex, MutexGuard}; use hyperswitch_domain_models::{ + behaviour::{Conversion, ReverseConversion}, errors::StorageError, + merchant_key_store::MerchantKeyStore, payments::{payment_attempt::PaymentAttempt, PaymentIntent}, }; use redis_interface::RedisSettings; @@ -109,6 +112,96 @@ impl MockDb { themes: Default::default(), }) } + + pub async fn find_resource( + &self, + state: &KeyManagerState, + key_store: &MerchantKeyStore, + resources: MutexGuard<'_, Vec>, + filter_fn: impl Fn(&&D) -> bool, + error_message: String, + ) -> CustomResult + where + D: Sync + ReverseConversion + Clone, + R: Conversion, + { + let resource = resources.iter().find(filter_fn).cloned(); + match resource { + Some(res) => Ok(res + .convert( + state, + key_store.key.get_inner(), + key_store.merchant_id.clone().into(), + ) + .await + .change_context(StorageError::DecryptionError)?), + None => Err(StorageError::ValueNotFound(error_message).into()), + } + } + + pub async fn find_resources( + &self, + state: &KeyManagerState, + key_store: &MerchantKeyStore, + resources: MutexGuard<'_, Vec>, + filter_fn: impl Fn(&&D) -> bool, + error_message: String, + ) -> CustomResult, StorageError> + where + D: Sync + ReverseConversion + Clone, + R: Conversion, + { + let resources: Vec<_> = resources.iter().filter(filter_fn).cloned().collect(); + if resources.is_empty() { + Err(StorageError::ValueNotFound(error_message).into()) + } else { + let pm_futures = resources + .into_iter() + .map(|pm| async { + pm.convert( + state, + key_store.key.get_inner(), + key_store.merchant_id.clone().into(), + ) + .await + .change_context(StorageError::DecryptionError) + }) + .collect::>(); + + let domain_resources = futures::future::try_join_all(pm_futures).await?; + + Ok(domain_resources) + } + } + + pub async fn update_resource( + &self, + state: &KeyManagerState, + key_store: &MerchantKeyStore, + mut resources: MutexGuard<'_, Vec>, + resource_updated: D, + filter_fn: impl Fn(&&mut D) -> bool, + error_message: String, + ) -> CustomResult + where + D: Sync + ReverseConversion + Clone, + R: Conversion, + { + if let Some(pm) = resources.iter_mut().find(filter_fn) { + *pm = resource_updated.clone(); + let result = resource_updated + .convert( + state, + key_store.key.get_inner(), + key_store.merchant_id.clone().into(), + ) + .await + .change_context(StorageError::DecryptionError)?; + Ok(result) + } else { + Err(StorageError::ValueNotFound(error_message).into()) + } + } } #[cfg(not(feature = "payouts"))] diff --git a/crates/storage_impl/src/payment_method.rs b/crates/storage_impl/src/payment_method.rs index 254505eaae..5ed5e36fb3 100644 --- a/crates/storage_impl/src/payment_method.rs +++ b/crates/storage_impl/src/payment_method.rs @@ -1,5 +1,1017 @@ -use diesel_models::PaymentMethod; +use diesel_models::payment_method::PaymentMethod; use crate::redis::kv_store::KvStorePartition; impl KvStorePartition for PaymentMethod {} + +use common_enums::enums::MerchantStorageScheme; +use common_utils::{errors::CustomResult, id_type, types::keymanager::KeyManagerState}; +use diesel_models::{ + kv, + payment_method::{PaymentMethodUpdate, PaymentMethodUpdateInternal}, +}; +use error_stack::ResultExt; +use hyperswitch_domain_models::{ + behaviour::{Conversion, ReverseConversion}, + errors, + merchant_key_store::MerchantKeyStore, + payment_methods::{PaymentMethod as DomainPaymentMethod, PaymentMethodInterface}, +}; +use router_env::{instrument, tracing}; + +use super::MockDb; +use crate::{ + diesel_error_to_data_error, + kv_router_store::{ + FilterResourceParams, InsertResourceParams, KVRouterStore, UpdateResourceParams, + }, + redis::kv_store::{Op, PartitionKey}, + utils::{pg_connection_read, pg_connection_write}, + DatabaseStore, RouterStore, +}; + +#[async_trait::async_trait] +impl PaymentMethodInterface for KVRouterStore { + #[cfg(all( + any(feature = "v1", feature = "v2"), + not(feature = "payment_methods_v2") + ))] + #[instrument(skip_all)] + async fn find_payment_method( + &self, + state: &KeyManagerState, + key_store: &MerchantKeyStore, + payment_method_id: &str, + storage_scheme: MerchantStorageScheme, + ) -> CustomResult { + let conn = pg_connection_read(self).await?; + self.find_resource_by_id( + state, + key_store, + storage_scheme, + PaymentMethod::find_by_payment_method_id(&conn, payment_method_id), + format!("payment_method_{}", payment_method_id), + ) + .await + } + + #[cfg(all(feature = "v2", feature = "payment_methods_v2"))] + #[instrument(skip_all)] + async fn find_payment_method( + &self, + state: &KeyManagerState, + key_store: &MerchantKeyStore, + payment_method_id: &id_type::GlobalPaymentMethodId, + storage_scheme: MerchantStorageScheme, + ) -> CustomResult { + let conn = pg_connection_read(self).await?; + self.find_resource_by_id( + state, + key_store, + storage_scheme, + PaymentMethod::find_by_id(&conn, payment_method_id), + format!("payment_method_{}", payment_method_id.get_string_repr()), + ) + .await + } + + #[cfg(all( + any(feature = "v1", feature = "v2"), + not(feature = "payment_methods_v2") + ))] + #[instrument(skip_all)] + async fn find_payment_method_by_locker_id( + &self, + state: &KeyManagerState, + key_store: &MerchantKeyStore, + locker_id: &str, + storage_scheme: MerchantStorageScheme, + ) -> CustomResult { + let conn = pg_connection_read(self).await?; + self.find_resource_by_id( + state, + key_store, + storage_scheme, + PaymentMethod::find_by_locker_id(&conn, locker_id), + format!("payment_method_locker_{}", locker_id), + ) + .await + } + + // not supported in kv + #[cfg(all( + any(feature = "v1", feature = "v2"), + not(feature = "payment_methods_v2") + ))] + #[instrument(skip_all)] + async fn get_payment_method_count_by_customer_id_merchant_id_status( + &self, + customer_id: &id_type::CustomerId, + merchant_id: &id_type::MerchantId, + status: common_enums::PaymentMethodStatus, + ) -> CustomResult { + self.router_store + .get_payment_method_count_by_customer_id_merchant_id_status( + customer_id, + merchant_id, + status, + ) + .await + } + + #[cfg(all(feature = "v2", feature = "payment_methods_v2"))] + #[instrument(skip_all)] + async fn insert_payment_method( + &self, + state: &KeyManagerState, + key_store: &MerchantKeyStore, + payment_method: DomainPaymentMethod, + storage_scheme: MerchantStorageScheme, + ) -> CustomResult { + self.router_store + .insert_payment_method(state, key_store, payment_method, storage_scheme) + .await + } + + #[cfg(all( + any(feature = "v1", feature = "v2"), + not(feature = "payment_methods_v2") + ))] + #[instrument(skip_all)] + async fn insert_payment_method( + &self, + state: &KeyManagerState, + key_store: &MerchantKeyStore, + payment_method: DomainPaymentMethod, + storage_scheme: MerchantStorageScheme, + ) -> CustomResult { + let conn = pg_connection_write(self).await?; + let mut payment_method_new = payment_method + .construct_new() + .await + .change_context(errors::StorageError::DecryptionError)?; + payment_method_new.update_storage_scheme(storage_scheme); + + let key = PartitionKey::MerchantIdCustomerId { + merchant_id: &payment_method_new.merchant_id.clone(), + customer_id: &payment_method_new.customer_id.clone(), + }; + let identifier = format!("payment_method_id_{}", payment_method_new.get_id()); + let lookup_id1 = format!("payment_method_{}", payment_method_new.get_id()); + let mut reverse_lookups = vec![lookup_id1]; + if let Some(locker_id) = &payment_method_new.locker_id { + reverse_lookups.push(format!("payment_method_locker_{}", locker_id)) + } + let payment_method = (&payment_method_new.clone()).into(); + self.insert_resource( + state, + key_store, + storage_scheme, + payment_method_new.clone().insert(&conn), + payment_method, + InsertResourceParams { + insertable: kv::Insertable::PaymentMethod(payment_method_new.clone()), + reverse_lookups, + key, + identifier, + resource_type: "payment_method", + }, + ) + .await + } + + #[cfg(all( + any(feature = "v1", feature = "v2"), + not(feature = "payment_methods_v2") + ))] + #[instrument(skip_all)] + async fn update_payment_method( + &self, + state: &KeyManagerState, + key_store: &MerchantKeyStore, + payment_method: DomainPaymentMethod, + payment_method_update: PaymentMethodUpdate, + storage_scheme: MerchantStorageScheme, + ) -> CustomResult { + let payment_method = Conversion::convert(payment_method) + .await + .change_context(errors::StorageError::DecryptionError)?; + + let merchant_id = payment_method.merchant_id.clone(); + let customer_id = payment_method.customer_id.clone(); + let key = PartitionKey::MerchantIdCustomerId { + merchant_id: &merchant_id, + customer_id: &customer_id, + }; + let conn = pg_connection_write(self).await?; + let field = format!("payment_method_id_{}", payment_method.get_id().clone()); + let p_update: PaymentMethodUpdateInternal = + payment_method_update.convert_to_payment_method_update(storage_scheme); + let updated_payment_method = p_update.clone().apply_changeset(payment_method.clone()); + self.update_resource( + state, + key_store, + storage_scheme, + payment_method + .clone() + .update_with_payment_method_id(&conn, p_update.clone()), + updated_payment_method, + UpdateResourceParams { + updateable: kv::Updateable::PaymentMethodUpdate(Box::new( + kv::PaymentMethodUpdateMems { + orig: payment_method.clone(), + update_data: p_update.clone(), + }, + )), + operation: Op::Update( + key.clone(), + &field, + payment_method.clone().updated_by.as_deref(), + ), + }, + ) + .await + } + + #[cfg(all(feature = "v2", feature = "payment_methods_v2"))] + #[instrument(skip_all)] + async fn update_payment_method( + &self, + state: &KeyManagerState, + key_store: &MerchantKeyStore, + payment_method: DomainPaymentMethod, + payment_method_update: PaymentMethodUpdate, + storage_scheme: MerchantStorageScheme, + ) -> CustomResult { + self.router_store + .update_payment_method( + state, + key_store, + payment_method, + payment_method_update, + storage_scheme, + ) + .await + } + + #[cfg(all( + any(feature = "v1", feature = "v2"), + not(feature = "payment_methods_v2") + ))] + #[instrument(skip_all)] + async fn find_payment_method_by_customer_id_merchant_id_list( + &self, + state: &KeyManagerState, + key_store: &MerchantKeyStore, + customer_id: &id_type::CustomerId, + merchant_id: &id_type::MerchantId, + limit: Option, + ) -> CustomResult, errors::StorageError> { + self.router_store + .find_payment_method_by_customer_id_merchant_id_list( + state, + key_store, + customer_id, + merchant_id, + limit, + ) + .await + } + + // Need to fix this once we start moving to v2 for payment method + #[cfg(all( + feature = "v2", + feature = "customer_v2", + feature = "payment_methods_v2" + ))] + async fn find_payment_method_list_by_global_customer_id( + &self, + state: &KeyManagerState, + key_store: &MerchantKeyStore, + customer_id: &id_type::GlobalCustomerId, + limit: Option, + ) -> CustomResult, errors::StorageError> { + self.router_store + .find_payment_method_list_by_global_customer_id(state, key_store, customer_id, limit) + .await + } + + #[cfg(all( + any(feature = "v1", feature = "v2"), + not(feature = "payment_methods_v2") + ))] + #[instrument(skip_all)] + async fn find_payment_method_by_customer_id_merchant_id_status( + &self, + state: &KeyManagerState, + key_store: &MerchantKeyStore, + customer_id: &id_type::CustomerId, + merchant_id: &id_type::MerchantId, + status: common_enums::PaymentMethodStatus, + limit: Option, + storage_scheme: MerchantStorageScheme, + ) -> CustomResult, errors::StorageError> { + let conn = pg_connection_read(self).await?; + self.filter_resources( + state, + key_store, + storage_scheme, + PaymentMethod::find_by_customer_id_merchant_id_status( + &conn, + customer_id, + merchant_id, + status, + limit, + ), + |pm| pm.status == status, + FilterResourceParams { + key: PartitionKey::MerchantIdCustomerId { + merchant_id, + customer_id, + }, + pattern: "payment_method_id_*", + limit, + }, + ) + .await + } + + #[cfg(all(feature = "v2", feature = "customer_v2"))] + #[instrument(skip_all)] + async fn find_payment_method_by_global_customer_id_merchant_id_status( + &self, + state: &KeyManagerState, + key_store: &MerchantKeyStore, + customer_id: &id_type::GlobalCustomerId, + merchant_id: &id_type::MerchantId, + status: common_enums::PaymentMethodStatus, + limit: Option, + storage_scheme: MerchantStorageScheme, + ) -> CustomResult, errors::StorageError> { + self.router_store + .find_payment_method_by_global_customer_id_merchant_id_status( + state, + key_store, + customer_id, + merchant_id, + status, + limit, + storage_scheme, + ) + .await + } + + #[cfg(all( + any(feature = "v1", feature = "v2"), + not(feature = "payment_methods_v2") + ))] + async fn delete_payment_method_by_merchant_id_payment_method_id( + &self, + state: &KeyManagerState, + key_store: &MerchantKeyStore, + merchant_id: &id_type::MerchantId, + payment_method_id: &str, + ) -> CustomResult { + self.router_store + .delete_payment_method_by_merchant_id_payment_method_id( + state, + key_store, + merchant_id, + payment_method_id, + ) + .await + } + + // Soft delete, Check if KV stuff is needed here + #[cfg(all(feature = "v2", feature = "payment_methods_v2"))] + async fn delete_payment_method( + &self, + state: &KeyManagerState, + key_store: &MerchantKeyStore, + payment_method: DomainPaymentMethod, + ) -> CustomResult { + self.router_store + .delete_payment_method(state, key_store, payment_method) + .await + } + + // Check if KV stuff is needed here + #[cfg(all(feature = "v2", feature = "payment_methods_v2"))] + async fn find_payment_method_by_fingerprint_id( + &self, + state: &KeyManagerState, + key_store: &MerchantKeyStore, + fingerprint_id: &str, + ) -> CustomResult { + let conn = pg_connection_read(self).await?; + self.router_store + .find_payment_method_by_fingerprint_id(state, key_store, fingerprint_id) + .await + } +} + +#[async_trait::async_trait] +impl PaymentMethodInterface for RouterStore { + #[instrument(skip_all)] + #[cfg(all( + any(feature = "v1", feature = "v2"), + not(feature = "payment_methods_v2") + ))] + async fn find_payment_method( + &self, + state: &KeyManagerState, + key_store: &MerchantKeyStore, + payment_method_id: &str, + _storage_scheme: MerchantStorageScheme, + ) -> CustomResult { + let conn = pg_connection_read(self).await?; + self.call_database( + state, + key_store, + PaymentMethod::find_by_payment_method_id(&conn, payment_method_id), + ) + .await + } + + #[cfg(all(feature = "v2", feature = "payment_methods_v2"))] + async fn find_payment_method( + &self, + state: &KeyManagerState, + key_store: &MerchantKeyStore, + payment_method_id: &id_type::GlobalPaymentMethodId, + _storage_scheme: MerchantStorageScheme, + ) -> CustomResult { + let conn = pg_connection_read(self).await?; + self.call_database( + state, + key_store, + PaymentMethod::find_by_id(&conn, payment_method_id), + ) + .await + } + + #[cfg(all( + any(feature = "v1", feature = "v2"), + not(feature = "payment_methods_v2") + ))] + #[instrument(skip_all)] + async fn find_payment_method_by_locker_id( + &self, + state: &KeyManagerState, + key_store: &MerchantKeyStore, + locker_id: &str, + _storage_scheme: MerchantStorageScheme, + ) -> CustomResult { + let conn = pg_connection_read(self).await?; + self.call_database( + state, + key_store, + PaymentMethod::find_by_locker_id(&conn, locker_id), + ) + .await + } + + #[cfg(all( + any(feature = "v1", feature = "v2"), + not(feature = "payment_methods_v2") + ))] + #[instrument(skip_all)] + async fn get_payment_method_count_by_customer_id_merchant_id_status( + &self, + customer_id: &id_type::CustomerId, + merchant_id: &id_type::MerchantId, + status: common_enums::PaymentMethodStatus, + ) -> CustomResult { + let conn = pg_connection_read(self).await?; + PaymentMethod::get_count_by_customer_id_merchant_id_status( + &conn, + customer_id, + merchant_id, + status, + ) + .await + .map_err(|error| { + let new_err = diesel_error_to_data_error(*error.current_context()); + error.change_context(new_err) + }) + } + + #[instrument(skip_all)] + async fn insert_payment_method( + &self, + state: &KeyManagerState, + key_store: &MerchantKeyStore, + payment_method: DomainPaymentMethod, + _storage_scheme: MerchantStorageScheme, + ) -> CustomResult { + let payment_method_new = payment_method + .construct_new() + .await + .change_context(errors::StorageError::DecryptionError)?; + + let conn = pg_connection_write(self).await?; + self.call_database(state, key_store, payment_method_new.insert(&conn)) + .await + } + + #[cfg(all( + any(feature = "v1", feature = "v2"), + not(feature = "payment_methods_v2") + ))] + #[instrument(skip_all)] + async fn update_payment_method( + &self, + state: &KeyManagerState, + key_store: &MerchantKeyStore, + payment_method: DomainPaymentMethod, + payment_method_update: PaymentMethodUpdate, + _storage_scheme: MerchantStorageScheme, + ) -> CustomResult { + let payment_method = Conversion::convert(payment_method) + .await + .change_context(errors::StorageError::DecryptionError)?; + + let conn = pg_connection_write(self).await?; + self.call_database( + state, + key_store, + payment_method.update_with_payment_method_id(&conn, payment_method_update.into()), + ) + .await + } + + #[cfg(all(feature = "v2", feature = "payment_methods_v2"))] + #[instrument(skip_all)] + async fn update_payment_method( + &self, + state: &KeyManagerState, + key_store: &MerchantKeyStore, + payment_method: DomainPaymentMethod, + payment_method_update: PaymentMethodUpdate, + storage_scheme: MerchantStorageScheme, + ) -> CustomResult { + let payment_method = Conversion::convert(payment_method) + .await + .change_context(errors::StorageError::DecryptionError)?; + let conn = pg_connection_write(self).await?; + self.call_database( + state, + key_store, + payment_method.update_with_id(&conn, payment_method_update.into()), + ) + .await + } + + #[cfg(all( + any(feature = "v1", feature = "v2"), + not(feature = "payment_methods_v2") + ))] + #[instrument(skip_all)] + async fn find_payment_method_by_customer_id_merchant_id_list( + &self, + state: &KeyManagerState, + key_store: &MerchantKeyStore, + customer_id: &id_type::CustomerId, + merchant_id: &id_type::MerchantId, + limit: Option, + ) -> CustomResult, errors::StorageError> { + let conn = pg_connection_read(self).await?; + self.find_resources( + state, + key_store, + PaymentMethod::find_by_customer_id_merchant_id(&conn, customer_id, merchant_id, limit), + ) + .await + } + + // Need to fix this once we move to payment method for customer + #[cfg(all(feature = "v2", feature = "customer_v2"))] + #[instrument(skip_all)] + async fn find_payment_method_list_by_global_customer_id( + &self, + state: &KeyManagerState, + key_store: &MerchantKeyStore, + id: &id_type::GlobalCustomerId, + limit: Option, + ) -> CustomResult, errors::StorageError> { + let conn = pg_connection_read(self).await?; + self.find_resources( + state, + key_store, + PaymentMethod::find_by_global_customer_id(&conn, id, limit), + ) + .await + } + + #[cfg(all( + any(feature = "v1", feature = "v2"), + not(feature = "payment_methods_v2") + ))] + #[instrument(skip_all)] + async fn find_payment_method_by_customer_id_merchant_id_status( + &self, + state: &KeyManagerState, + key_store: &MerchantKeyStore, + customer_id: &id_type::CustomerId, + merchant_id: &id_type::MerchantId, + status: common_enums::PaymentMethodStatus, + limit: Option, + _storage_scheme: MerchantStorageScheme, + ) -> CustomResult, errors::StorageError> { + let conn = pg_connection_read(self).await?; + self.find_resources( + state, + key_store, + PaymentMethod::find_by_customer_id_merchant_id_status( + &conn, + customer_id, + merchant_id, + status, + limit, + ), + ) + .await + } + + #[cfg(all(feature = "v2", feature = "customer_v2"))] + #[instrument(skip_all)] + async fn find_payment_method_by_global_customer_id_merchant_id_status( + &self, + state: &KeyManagerState, + key_store: &MerchantKeyStore, + customer_id: &id_type::GlobalCustomerId, + merchant_id: &id_type::MerchantId, + status: common_enums::PaymentMethodStatus, + limit: Option, + _storage_scheme: MerchantStorageScheme, + ) -> CustomResult, errors::StorageError> { + let conn = pg_connection_read(self).await?; + self.find_resources( + state, + key_store, + PaymentMethod::find_by_global_customer_id_merchant_id_status( + &conn, + customer_id, + merchant_id, + status, + limit, + ), + ) + .await + } + + #[cfg(all( + any(feature = "v1", feature = "v2"), + not(feature = "payment_methods_v2") + ))] + async fn delete_payment_method_by_merchant_id_payment_method_id( + &self, + state: &KeyManagerState, + key_store: &MerchantKeyStore, + merchant_id: &id_type::MerchantId, + payment_method_id: &str, + ) -> CustomResult { + let conn = pg_connection_write(self).await?; + self.call_database( + state, + key_store, + PaymentMethod::delete_by_merchant_id_payment_method_id( + &conn, + merchant_id, + payment_method_id, + ), + ) + .await + } + + #[cfg(all(feature = "v2", feature = "payment_methods_v2"))] + async fn delete_payment_method( + &self, + state: &KeyManagerState, + key_store: &MerchantKeyStore, + payment_method: DomainPaymentMethod, + ) -> CustomResult { + let payment_method = Conversion::convert(payment_method) + .await + .change_context(errors::StorageError::DecryptionError)?; + let conn = pg_connection_write(self).await?; + let payment_method_update = PaymentMethodUpdate::StatusUpdate { + status: Some(common_enums::PaymentMethodStatus::Inactive), + }; + self.call_database( + state, + key_store, + payment_method.update_with_id(&conn, payment_method_update.into()), + ) + .await + } + + #[cfg(all(feature = "v2", feature = "payment_methods_v2"))] + async fn find_payment_method_by_fingerprint_id( + &self, + state: &KeyManagerState, + key_store: &MerchantKeyStore, + fingerprint_id: &str, + ) -> CustomResult { + let conn = pg_connection_read(self).await?; + self.call_database( + state, + key_store, + PaymentMethod::find_by_fingerprint_id(&conn, fingerprint_id), + ) + .await + } +} + +#[async_trait::async_trait] +impl PaymentMethodInterface for MockDb { + #[cfg(all( + any(feature = "v1", feature = "v2"), + not(feature = "payment_methods_v2") + ))] + async fn find_payment_method( + &self, + state: &KeyManagerState, + key_store: &MerchantKeyStore, + payment_method_id: &str, + _storage_scheme: MerchantStorageScheme, + ) -> CustomResult { + let payment_methods = self.payment_methods.lock().await; + self.find_resource::( + state, + key_store, + payment_methods, + |pm| pm.get_id() == payment_method_id, + "cannot find payment method".to_string(), + ) + .await + } + + #[cfg(all(feature = "v2", feature = "payment_methods_v2"))] + async fn find_payment_method( + &self, + state: &KeyManagerState, + key_store: &MerchantKeyStore, + payment_method_id: &id_type::GlobalPaymentMethodId, + _storage_scheme: MerchantStorageScheme, + ) -> CustomResult { + let payment_methods = self.payment_methods.lock().await; + self.find_resource::( + state, + key_store, + payment_methods, + |pm| pm.get_id() == payment_method_id, + "cannot find payment method".to_string(), + ) + .await + } + + #[cfg(all( + any(feature = "v1", feature = "v2"), + not(feature = "payment_methods_v2") + ))] + async fn find_payment_method_by_locker_id( + &self, + state: &KeyManagerState, + key_store: &MerchantKeyStore, + locker_id: &str, + _storage_scheme: MerchantStorageScheme, + ) -> CustomResult { + let payment_methods = self.payment_methods.lock().await; + self.find_resource::( + state, + key_store, + payment_methods, + |pm| pm.locker_id == Some(locker_id.to_string()), + "cannot find payment method".to_string(), + ) + .await + } + + #[cfg(all( + any(feature = "v1", feature = "v2"), + not(feature = "payment_methods_v2") + ))] + async fn get_payment_method_count_by_customer_id_merchant_id_status( + &self, + customer_id: &id_type::CustomerId, + merchant_id: &id_type::MerchantId, + status: common_enums::PaymentMethodStatus, + ) -> CustomResult { + let payment_methods = self.payment_methods.lock().await; + let count = payment_methods + .iter() + .filter(|pm| { + pm.customer_id == *customer_id + && pm.merchant_id == *merchant_id + && pm.status == status + }) + .count(); + i64::try_from(count).change_context(errors::StorageError::MockDbError) + } + + async fn insert_payment_method( + &self, + _state: &KeyManagerState, + _key_store: &MerchantKeyStore, + payment_method: DomainPaymentMethod, + _storage_scheme: MerchantStorageScheme, + ) -> CustomResult { + let mut payment_methods = self.payment_methods.lock().await; + + let pm = Conversion::convert(payment_method.clone()) + .await + .change_context(errors::StorageError::DecryptionError)?; + + payment_methods.push(pm); + Ok(payment_method) + } + + #[cfg(all( + any(feature = "v1", feature = "v2"), + not(feature = "payment_methods_v2") + ))] + async fn find_payment_method_by_customer_id_merchant_id_list( + &self, + state: &KeyManagerState, + key_store: &MerchantKeyStore, + customer_id: &id_type::CustomerId, + merchant_id: &id_type::MerchantId, + _limit: Option, + ) -> CustomResult, errors::StorageError> { + let payment_methods = self.payment_methods.lock().await; + self.find_resources( + state, + key_store, + payment_methods, + |pm| pm.customer_id == *customer_id && pm.merchant_id == *merchant_id, + "cannot find payment method".to_string(), + ) + .await + } + + // Need to fix this once we complete v2 payment method + #[cfg(all(feature = "v2", feature = "customer_v2"))] + async fn find_payment_method_list_by_global_customer_id( + &self, + state: &KeyManagerState, + key_store: &MerchantKeyStore, + _id: &id_type::GlobalCustomerId, + _limit: Option, + ) -> CustomResult, errors::StorageError> { + todo!() + } + + #[cfg(all( + any(feature = "v1", feature = "v2"), + not(feature = "payment_methods_v2") + ))] + async fn find_payment_method_by_customer_id_merchant_id_status( + &self, + state: &KeyManagerState, + key_store: &MerchantKeyStore, + customer_id: &id_type::CustomerId, + merchant_id: &id_type::MerchantId, + status: common_enums::PaymentMethodStatus, + _limit: Option, + _storage_scheme: MerchantStorageScheme, + ) -> CustomResult, errors::StorageError> { + let payment_methods = self.payment_methods.lock().await; + self.find_resources( + state, + key_store, + payment_methods, + |pm| { + pm.customer_id == *customer_id + && pm.merchant_id == *merchant_id + && pm.status == status + }, + "cannot find payment method".to_string(), + ) + .await + } + + #[cfg(all(feature = "v2", feature = "customer_v2"))] + async fn find_payment_method_by_global_customer_id_merchant_id_status( + &self, + state: &KeyManagerState, + key_store: &MerchantKeyStore, + customer_id: &id_type::GlobalCustomerId, + merchant_id: &id_type::MerchantId, + status: common_enums::PaymentMethodStatus, + _limit: Option, + _storage_scheme: MerchantStorageScheme, + ) -> CustomResult, errors::StorageError> { + let payment_methods = self.payment_methods.lock().await; + let find_pm_by = |pm: &&PaymentMethod| { + pm.customer_id == *customer_id && pm.merchant_id == *merchant_id && pm.status == status + }; + let error_message = "cannot find payment method".to_string(); + self.find_resources(state, key_store, payment_methods, find_pm_by, error_message) + .await + } + + #[cfg(all( + any(feature = "v1", feature = "v2"), + not(feature = "payment_methods_v2") + ))] + async fn delete_payment_method_by_merchant_id_payment_method_id( + &self, + state: &KeyManagerState, + key_store: &MerchantKeyStore, + merchant_id: &id_type::MerchantId, + payment_method_id: &str, + ) -> CustomResult { + let mut payment_methods = self.payment_methods.lock().await; + match payment_methods + .iter() + .position(|pm| pm.merchant_id == *merchant_id && pm.get_id() == payment_method_id) + { + Some(index) => { + let deleted_payment_method = payment_methods.remove(index); + Ok(deleted_payment_method + .convert( + state, + key_store.key.get_inner(), + key_store.merchant_id.clone().into(), + ) + .await + .change_context(errors::StorageError::DecryptionError)?) + } + None => Err(errors::StorageError::ValueNotFound( + "cannot find payment method to delete".to_string(), + ) + .into()), + } + } + + async fn update_payment_method( + &self, + state: &KeyManagerState, + key_store: &MerchantKeyStore, + payment_method: DomainPaymentMethod, + payment_method_update: PaymentMethodUpdate, + _storage_scheme: MerchantStorageScheme, + ) -> CustomResult { + let payment_method_updated = PaymentMethodUpdateInternal::from(payment_method_update) + .apply_changeset( + Conversion::convert(payment_method.clone()) + .await + .change_context(errors::StorageError::EncryptionError)?, + ); + self.update_resource::( + state, + key_store, + self.payment_methods.lock().await, + payment_method_updated, + |pm| pm.get_id() == payment_method.get_id(), + "cannot update payment method".to_string(), + ) + .await + } + + #[cfg(all(feature = "v2", feature = "payment_methods_v2"))] + async fn delete_payment_method( + &self, + state: &KeyManagerState, + key_store: &MerchantKeyStore, + payment_method: DomainPaymentMethod, + ) -> CustomResult { + let payment_method_update = PaymentMethodUpdate::StatusUpdate { + status: Some(common_enums::PaymentMethodStatus::Inactive), + }; + let payment_method_updated = PaymentMethodUpdateInternal::from(payment_method_update) + .apply_changeset( + Conversion::convert(payment_method.clone()) + .await + .change_context(errors::StorageError::EncryptionError)?, + ); + self.update_resource::( + state, + key_store, + self.payment_methods.lock().await, + payment_method_updated, + |pm| pm.get_id() == payment_method.get_id(), + "cannot find payment method".to_string(), + ) + .await + } + + #[cfg(all(feature = "v2", feature = "payment_methods_v2"))] + async fn find_payment_method_by_fingerprint_id( + &self, + state: &KeyManagerState, + key_store: &MerchantKeyStore, + fingerprint_id: &str, + ) -> CustomResult { + let payment_methods = self.payment_methods.lock().await; + self.find_resource::( + state, + key_store, + payment_methods, + |pm| pm.locker_fingerprint_id == Some(fingerprint_id.to_string()), + "cannot find payment method".to_string(), + ) + .await + } +} diff --git a/crates/storage_impl/src/payments/payment_attempt.rs b/crates/storage_impl/src/payments/payment_attempt.rs index 96643cefc0..076b120c2d 100644 --- a/crates/storage_impl/src/payments/payment_attempt.rs +++ b/crates/storage_impl/src/payments/payment_attempt.rs @@ -39,10 +39,11 @@ use router_env::{instrument, tracing}; use crate::{ diesel_error_to_data_error, errors::RedisErrorExt, + kv_router_store::KVRouterStore, lookup::ReverseLookupInterface, redis::kv_store::{decide_storage_scheme, kv_wrapper, KvOperation, Op, PartitionKey}, utils::{pg_connection_read, pg_connection_write, try_redis_get_else_try_database_get}, - DataModelExt, DatabaseStore, KVRouterStore, RouterStore, + DataModelExt, DatabaseStore, RouterStore, }; #[async_trait::async_trait] diff --git a/crates/storage_impl/src/payments/payment_intent.rs b/crates/storage_impl/src/payments/payment_intent.rs index e53a72c873..054326fe6a 100644 --- a/crates/storage_impl/src/payments/payment_intent.rs +++ b/crates/storage_impl/src/payments/payment_intent.rs @@ -51,9 +51,10 @@ use crate::connection; use crate::{ diesel_error_to_data_error, errors::RedisErrorExt, + kv_router_store::KVRouterStore, redis::kv_store::{decide_storage_scheme, kv_wrapper, KvOperation, Op, PartitionKey}, utils::{self, pg_connection_read, pg_connection_write}, - DatabaseStore, KVRouterStore, + DatabaseStore, }; #[async_trait::async_trait] diff --git a/crates/storage_impl/src/payouts/payout_attempt.rs b/crates/storage_impl/src/payouts/payout_attempt.rs index caefa4eeda..a56b3cd2c5 100644 --- a/crates/storage_impl/src/payouts/payout_attempt.rs +++ b/crates/storage_impl/src/payouts/payout_attempt.rs @@ -29,10 +29,11 @@ use router_env::{instrument, logger, tracing}; use crate::{ diesel_error_to_data_error, errors::RedisErrorExt, + kv_router_store::KVRouterStore, lookup::ReverseLookupInterface, redis::kv_store::{decide_storage_scheme, kv_wrapper, KvOperation, Op, PartitionKey}, utils::{self, pg_connection_read, pg_connection_write}, - DataModelExt, DatabaseStore, KVRouterStore, + DataModelExt, DatabaseStore, }; #[async_trait::async_trait] diff --git a/crates/storage_impl/src/payouts/payouts.rs b/crates/storage_impl/src/payouts/payouts.rs index 0c69b2f241..25e7dd95f6 100644 --- a/crates/storage_impl/src/payouts/payouts.rs +++ b/crates/storage_impl/src/payouts/payouts.rs @@ -69,9 +69,10 @@ use crate::store::schema::{ use crate::{ diesel_error_to_data_error, errors::RedisErrorExt, + kv_router_store::KVRouterStore, redis::kv_store::{decide_storage_scheme, kv_wrapper, KvOperation, Op, PartitionKey}, utils::{self, pg_connection_read, pg_connection_write}, - DataModelExt, DatabaseStore, KVRouterStore, + DataModelExt, DatabaseStore, }; #[async_trait::async_trait] diff --git a/crates/storage_impl/src/redis/kv_store.rs b/crates/storage_impl/src/redis/kv_store.rs index 6e1340abca..724a4b1b3c 100644 --- a/crates/storage_impl/src/redis/kv_store.rs +++ b/crates/storage_impl/src/redis/kv_store.rs @@ -8,7 +8,7 @@ use router_derive::TryGetEnumVariant; use router_env::logger; use serde::de; -use crate::{metrics, store::kv::TypedSql, KVRouterStore, UniqueConstraints}; +use crate::{kv_router_store::KVRouterStore, metrics, store::kv::TypedSql, UniqueConstraints}; pub trait KvStorePartition { fn partition_number(key: PartitionKey<'_>, num_partitions: u8) -> u32 { diff --git a/crates/storage_impl/src/utils.rs b/crates/storage_impl/src/utils.rs index 01e0e8cbc1..a5fb19a93d 100644 --- a/crates/storage_impl/src/utils.rs +++ b/crates/storage_impl/src/utils.rs @@ -67,3 +67,73 @@ where }, } } + +use std::collections::HashSet; + +use crate::UniqueConstraints; + +fn union_vec(mut kv_rows: Vec, sql_rows: Vec) -> Vec +where + T: UniqueConstraints, +{ + let mut kv_unique_keys = HashSet::new(); + + kv_rows.iter().for_each(|v| { + kv_unique_keys.insert(v.unique_constraints().concat()); + }); + + sql_rows.into_iter().for_each(|v| { + let unique_key = v.unique_constraints().concat(); + if !kv_unique_keys.contains(&unique_key) { + kv_rows.push(v); + } + }); + + kv_rows +} + +pub async fn find_all_combined_kv_database( + redis_fut: RFut, + database_call: F, + limit: Option, +) -> error_stack::Result, StorageError> +where + T: UniqueConstraints, + F: FnOnce() -> DFut, + RFut: + futures::Future, redis_interface::errors::RedisError>>, + DFut: futures::Future, StorageError>>, +{ + let trunc = |v: &mut Vec<_>| { + if let Some(l) = limit.and_then(|v| TryInto::try_into(v).ok()) { + v.truncate(l); + } + }; + + let limit_satisfies = |len: usize, limit: i64| { + TryInto::try_into(limit) + .ok() + .map_or(true, |val: usize| len >= val) + }; + + let redis_output = redis_fut.await; + match (redis_output, limit) { + (Ok(mut kv_rows), Some(lim)) if limit_satisfies(kv_rows.len(), lim) => { + trunc(&mut kv_rows); + Ok(kv_rows) + } + (Ok(kv_rows), _) => database_call().await.map(|db_rows| { + let mut res = union_vec(kv_rows, db_rows); + trunc(&mut res); + res + }), + (Err(redis_error), _) => match redis_error.current_context() { + redis_interface::errors::RedisError::NotFound => { + metrics::KV_MISS.add(1, &[]); + database_call().await + } + // Keeping the key empty here since the error would never go here. + _ => Err(redis_error.to_redis_failed_response("")), + }, + } +}