From eb3cecdd74b4c758948f9de82727af76b9ba9fb0 Mon Sep 17 00:00:00 2001 From: akshay-97 Date: Wed, 10 Apr 2024 19:29:50 +0530 Subject: [PATCH] feat(payment_methods): added kv support for payment_methods table (#4311) Co-authored-by: Akshay S Co-authored-by: hyperswitch-bot[bot] <148525504+hyperswitch-bot[bot]@users.noreply.github.com> --- crates/diesel_models/src/kv.rs | 21 +- crates/diesel_models/src/payment_method.rs | 68 +- .../diesel_models/src/query/payment_method.rs | 4 +- crates/router/src/core/mandate.rs | 24 +- crates/router/src/core/payment_methods.rs | 6 + .../router/src/core/payment_methods/cards.rs | 85 ++- crates/router/src/core/payments.rs | 1 + crates/router/src/core/payments/helpers.rs | 20 +- crates/router/src/core/payments/operations.rs | 3 +- .../operations/payment_complete_authorize.rs | 3 +- .../payments/operations/payment_confirm.rs | 22 +- .../payments/operations/payment_create.rs | 3 +- .../payments/operations/payment_response.rs | 6 +- .../core/payments/operations/payment_start.rs | 3 +- .../payments/operations/payment_status.rs | 8 +- .../payments/operations/payment_update.rs | 3 +- .../router/src/core/payments/tokenization.rs | 42 +- crates/router/src/core/payouts/helpers.rs | 15 +- crates/router/src/core/pm_auth.rs | 15 +- crates/router/src/core/webhooks.rs | 1 + crates/router/src/db/address.rs | 22 +- crates/router/src/db/kafka_store.rs | 14 +- crates/router/src/db/payment_method.rs | 608 ++++++++++++++---- crates/router/src/db/refund.rs | 53 +- crates/router/src/db/reverse_lookup.rs | 13 +- crates/router/src/routes/payment_methods.rs | 5 +- crates/router/src/types/api/mandates.rs | 4 +- crates/storage_impl/src/lib.rs | 10 + crates/storage_impl/src/lookup.rs | 10 +- crates/storage_impl/src/payment_method.rs | 5 + .../src/payments/payment_attempt.rs | 74 ++- .../src/payments/payment_intent.rs | 35 +- .../src/payouts/payout_attempt.rs | 35 +- crates/storage_impl/src/payouts/payouts.rs | 40 +- crates/storage_impl/src/redis/kv_store.rs | 51 +- 35 files changed, 1017 insertions(+), 315 deletions(-) create mode 100644 crates/storage_impl/src/payment_method.rs diff --git a/crates/diesel_models/src/kv.rs b/crates/diesel_models/src/kv.rs index 38429b414b..4d0eb48259 100644 --- a/crates/diesel_models/src/kv.rs +++ b/crates/diesel_models/src/kv.rs @@ -10,7 +10,7 @@ use crate::{ payouts::{Payouts, PayoutsNew, PayoutsUpdate}, refund::{Refund, RefundNew, RefundUpdate}, reverse_lookup::{ReverseLookup, ReverseLookupNew}, - PaymentIntent, PgPooledConn, + PaymentIntent, PaymentMethod, PaymentMethodNew, PaymentMethodUpdateInternal, PgPooledConn, }; #[derive(Debug, Serialize, Deserialize)] @@ -37,6 +37,7 @@ impl DBOperation { Insertable::Payouts(_) => "payouts", Insertable::PayoutAttempt(_) => "payout_attempt", Insertable::ReverseLookUp(_) => "reverse_lookup", + Insertable::PaymentMethod(_) => "payment_method", }, Self::Update { updatable } => match updatable { Updateable::PaymentIntentUpdate(_) => "payment_intent", @@ -45,6 +46,7 @@ impl DBOperation { Updateable::AddressUpdate(_) => "address", Updateable::PayoutsUpdate(_) => "payouts", Updateable::PayoutAttemptUpdate(_) => "payout_attempt", + Updateable::PaymentMethodUpdate(_) => "payment_method", }, } } @@ -59,6 +61,7 @@ pub enum DBResult { ReverseLookUp(Box), Payouts(Box), PayoutAttempt(Box), + PaymentMethod(Box), } #[derive(Debug, Serialize, Deserialize)] @@ -86,6 +89,9 @@ impl DBOperation { Insertable::PayoutAttempt(rev) => { DBResult::PayoutAttempt(Box::new(rev.insert(conn).await?)) } + Insertable::PaymentMethod(rev) => { + DBResult::PaymentMethod(Box::new(rev.insert(conn).await?)) + } }, Self::Update { updatable } => match updatable { Updateable::PaymentIntentUpdate(a) => { @@ -106,6 +112,11 @@ impl DBOperation { Updateable::PayoutAttemptUpdate(a) => DBResult::PayoutAttempt(Box::new( a.orig.update_with_attempt_id(conn, a.update_data).await?, )), + Updateable::PaymentMethodUpdate(v) => DBResult::PaymentMethod(Box::new( + v.orig + .update_with_payment_method_id(conn, v.update_data) + .await?, + )), }, }) } @@ -142,6 +153,7 @@ pub enum Insertable { ReverseLookUp(ReverseLookupNew), Payouts(PayoutsNew), PayoutAttempt(PayoutAttemptNew), + PaymentMethod(PaymentMethodNew), } #[derive(Debug, Serialize, Deserialize)] @@ -153,6 +165,7 @@ pub enum Updateable { AddressUpdate(Box), PayoutsUpdate(PayoutsUpdateMems), PayoutAttemptUpdate(PayoutAttemptUpdateMems), + PaymentMethodUpdate(PaymentMethodUpdateMems), } #[derive(Debug, Serialize, Deserialize)] @@ -190,3 +203,9 @@ pub struct PayoutAttemptUpdateMems { pub orig: PayoutAttempt, pub update_data: PayoutAttemptUpdate, } + +#[derive(Debug, Serialize, Deserialize)] +pub struct PaymentMethodUpdateMems { + pub orig: PaymentMethod, + pub update_data: PaymentMethodUpdateInternal, +} diff --git a/crates/diesel_models/src/payment_method.rs b/crates/diesel_models/src/payment_method.rs index 7c28f4cd28..dbdbc78aa0 100644 --- a/crates/diesel_models/src/payment_method.rs +++ b/crates/diesel_models/src/payment_method.rs @@ -6,7 +6,7 @@ use time::PrimitiveDateTime; use crate::{encryption::Encryption, enums as storage_enums, schema::payment_methods}; -#[derive(Clone, Debug, Eq, PartialEq, Identifiable, Queryable)] +#[derive(Clone, Debug, Eq, PartialEq, Identifiable, Queryable, Serialize, Deserialize)] #[diesel(table_name = payment_methods)] pub struct PaymentMethod { pub id: i32, @@ -41,7 +41,9 @@ pub struct PaymentMethod { pub network_transaction_id: Option, } -#[derive(Clone, Debug, Eq, PartialEq, Insertable, router_derive::DebugAsDisplay)] +#[derive( + Clone, Debug, Eq, PartialEq, Insertable, router_derive::DebugAsDisplay, Serialize, Deserialize, +)] #[diesel(table_name = payment_methods)] pub struct PaymentMethodNew { pub customer_id: String, @@ -138,7 +140,9 @@ pub enum PaymentMethodUpdate { }, } -#[derive(Clone, Debug, Default, AsChangeset, router_derive::DebugAsDisplay)] +#[derive( + Clone, Debug, Default, AsChangeset, router_derive::DebugAsDisplay, Serialize, Deserialize, +)] #[diesel(table_name = payment_methods)] pub struct PaymentMethodUpdateInternal { metadata: Option, @@ -155,6 +159,29 @@ impl PaymentMethodUpdateInternal { PaymentMethod { metadata, ..source } } + + pub fn apply_changeset(self, source: PaymentMethod) -> PaymentMethod { + let Self { + metadata, + payment_method_data, + last_used_at, + network_transaction_id, + status, + connector_mandate_details, + } = self; + + PaymentMethod { + metadata: metadata.map_or(source.metadata, |v| Some(v.into())), + payment_method_data: payment_method_data.map_or(source.payment_method_data, Some), + last_used_at: last_used_at.unwrap_or(source.last_used_at), + network_transaction_id: network_transaction_id + .map_or(source.network_transaction_id, Some), + status: status.unwrap_or(source.status), + connector_mandate_details: connector_mandate_details + .map_or(source.connector_mandate_details, Some), + ..source + } + } } impl From for PaymentMethodUpdateInternal { @@ -218,3 +245,38 @@ impl From for PaymentMethodUpdateInternal { } } } + +impl From<&PaymentMethodNew> for PaymentMethod { + fn from(payment_method_new: &PaymentMethodNew) -> Self { + Self { + id: 0i32, + customer_id: payment_method_new.customer_id.clone(), + merchant_id: payment_method_new.merchant_id.clone(), + payment_method_id: payment_method_new.payment_method_id.clone(), + locker_id: payment_method_new.locker_id.clone(), + accepted_currency: payment_method_new.accepted_currency.clone(), + scheme: payment_method_new.scheme.clone(), + token: payment_method_new.token.clone(), + cardholder_name: payment_method_new.cardholder_name.clone(), + issuer_name: payment_method_new.issuer_name.clone(), + issuer_country: payment_method_new.issuer_country.clone(), + payer_country: payment_method_new.payer_country.clone(), + is_stored: payment_method_new.is_stored, + swift_code: payment_method_new.swift_code.clone(), + direct_debit_token: payment_method_new.direct_debit_token.clone(), + created_at: payment_method_new.created_at, + last_modified: payment_method_new.last_modified, + payment_method: payment_method_new.payment_method, + payment_method_type: payment_method_new.payment_method_type, + payment_method_issuer: payment_method_new.payment_method_issuer.clone(), + payment_method_issuer_code: payment_method_new.payment_method_issuer_code, + metadata: payment_method_new.metadata.clone(), + payment_method_data: payment_method_new.payment_method_data.clone(), + last_used_at: payment_method_new.last_used_at, + connector_mandate_details: payment_method_new.connector_mandate_details.clone(), + customer_acceptance: payment_method_new.customer_acceptance.clone(), + status: payment_method_new.status, + network_transaction_id: payment_method_new.network_transaction_id.clone(), + } + } +} diff --git a/crates/diesel_models/src/query/payment_method.rs b/crates/diesel_models/src/query/payment_method.rs index 7acf49a157..d3ae4409ee 100644 --- a/crates/diesel_models/src/query/payment_method.rs +++ b/crates/diesel_models/src/query/payment_method.rs @@ -151,7 +151,7 @@ impl PaymentMethod { pub async fn update_with_payment_method_id( self, conn: &PgPooledConn, - payment_method: payment_method::PaymentMethodUpdate, + payment_method: payment_method::PaymentMethodUpdateInternal, ) -> StorageResult { match generics::generic_update_with_unique_predicate_get_result::< ::Table, @@ -161,7 +161,7 @@ impl PaymentMethod { >( conn, dsl::payment_method_id.eq(self.payment_method_id.to_owned()), - payment_method::PaymentMethodUpdateInternal::from(payment_method), + payment_method, ) .await { diff --git a/crates/router/src/core/mandate.rs b/crates/router/src/core/mandate.rs index e3eb9b0c97..54fec6ab4a 100644 --- a/crates/router/src/core/mandate.rs +++ b/crates/router/src/core/mandate.rs @@ -43,7 +43,13 @@ pub async fn get_mandate( .await .to_not_found_response(errors::ApiErrorResponse::MandateNotFound)?; Ok(services::ApplicationResponse::Json( - mandates::MandateResponse::from_db_mandate(&state, key_store, mandate).await?, + mandates::MandateResponse::from_db_mandate( + &state, + key_store, + mandate, + merchant_account.storage_scheme, + ) + .await?, )) } @@ -226,8 +232,13 @@ pub async fn get_customer_mandates( let mut response_vec = Vec::with_capacity(mandates.len()); for mandate in mandates { response_vec.push( - mandates::MandateResponse::from_db_mandate(&state, key_store.clone(), mandate) - .await?, + mandates::MandateResponse::from_db_mandate( + &state, + key_store.clone(), + mandate, + merchant_account.storage_scheme, + ) + .await?, ); } Ok(services::ApplicationResponse::Json(response_vec)) @@ -471,7 +482,12 @@ pub async fn retrieve_mandates_list( .change_context(errors::ApiErrorResponse::InternalServerError) .attach_printable("Unable to retrieve mandates")?; let mandates_list = future::try_join_all(mandates.into_iter().map(|mandate| { - mandates::MandateResponse::from_db_mandate(&state, key_store.clone(), mandate) + mandates::MandateResponse::from_db_mandate( + &state, + key_store.clone(), + mandate, + merchant_account.storage_scheme, + ) })) .await?; Ok(services::ApplicationResponse::Json(mandates_list)) diff --git a/crates/router/src/core/payment_methods.rs b/crates/router/src/core/payment_methods.rs index dd109e99cb..2b7577e1fb 100644 --- a/crates/router/src/core/payment_methods.rs +++ b/crates/router/src/core/payment_methods.rs @@ -38,6 +38,7 @@ pub trait PaymentMethodRetrieve { payment_intent: &PaymentIntent, card_token_data: Option<&CardToken>, customer: &Option, + storage_scheme: common_enums::enums::MerchantStorageScheme, ) -> RouterResult; } @@ -122,6 +123,7 @@ impl PaymentMethodRetrieve for Oss { payment_intent: &PaymentIntent, card_token_data: Option<&CardToken>, customer: &Option, + storage_scheme: common_enums::enums::MerchantStorageScheme, ) -> RouterResult { let token = match token_data { storage::PaymentTokenData::TemporaryGeneric(generic_token) => { @@ -172,6 +174,8 @@ impl PaymentMethodRetrieve for Oss { .unwrap_or(&card_token.token), payment_intent, card_token_data, + merchant_key_store, + storage_scheme, ) .await .map(|card| Some((card, enums::PaymentMethod::Card)))? @@ -201,6 +205,8 @@ impl PaymentMethodRetrieve for Oss { .unwrap_or(&card_token.token), payment_intent, card_token_data, + merchant_key_store, + storage_scheme, ) .await .map(|card| Some((card, enums::PaymentMethod::Card)))? diff --git a/crates/router/src/core/payment_methods/cards.rs b/crates/router/src/core/payment_methods/cards.rs index fc933ea28d..1495b2ba24 100644 --- a/crates/router/src/core/payment_methods/cards.rs +++ b/crates/router/src/core/payment_methods/cards.rs @@ -18,6 +18,7 @@ use api_models::{ pm_auth::PaymentMethodAuthConfig, surcharge_decision_configs as api_surcharge_decision_configs, }; +use common_enums::enums::MerchantStorageScheme; use common_utils::{ consts, ext_traits::{AsyncExt, Encode, StringExt, ValueExt}, @@ -88,6 +89,7 @@ pub async fn create_payment_method( key_store: &domain::MerchantKeyStore, connector_mandate_details: Option, network_transaction_id: Option, + storage_scheme: MerchantStorageScheme, ) -> errors::CustomResult { let customer = db .find_customer_by_customer_id_merchant_id(customer_id, merchant_id, key_store) @@ -95,22 +97,25 @@ pub async fn create_payment_method( .to_not_found_response(errors::ApiErrorResponse::CustomerNotFound)?; let response = db - .insert_payment_method(storage::PaymentMethodNew { - customer_id: customer_id.to_string(), - merchant_id: merchant_id.to_string(), - payment_method_id: payment_method_id.to_string(), - locker_id, - payment_method: req.payment_method, - payment_method_type: req.payment_method_type, - payment_method_issuer: req.payment_method_issuer.clone(), - scheme: req.card_network.clone(), - metadata: pm_metadata.map(masking::Secret::new), - payment_method_data, - connector_mandate_details, - customer_acceptance: customer_acceptance.map(masking::Secret::new), - network_transaction_id: network_transaction_id.to_owned(), - ..storage::PaymentMethodNew::default() - }) + .insert_payment_method( + storage::PaymentMethodNew { + customer_id: customer_id.to_string(), + merchant_id: merchant_id.to_string(), + payment_method_id: payment_method_id.to_string(), + locker_id, + payment_method: req.payment_method, + payment_method_type: req.payment_method_type, + payment_method_issuer: req.payment_method_issuer.clone(), + scheme: req.card_network.clone(), + metadata: pm_metadata.map(masking::Secret::new), + payment_method_data, + connector_mandate_details, + customer_acceptance: customer_acceptance.map(masking::Secret::new), + network_transaction_id: network_transaction_id.to_owned(), + ..storage::PaymentMethodNew::default() + }, + storage_scheme, + ) .await .change_context(errors::ApiErrorResponse::InternalServerError) .attach_printable("Failed to add payment method in db")?; @@ -122,6 +127,7 @@ pub async fn create_payment_method( key_store.clone(), customer_id, payment_method_id.to_owned(), + storage_scheme, ) .await .map_err(|err| logger::error!(error=?err,"Failed to set the payment method as default")); @@ -169,13 +175,18 @@ pub async fn get_or_insert_payment_method( let mut payment_method_id = resp.payment_method_id.clone(); let mut locker_id = None; let payment_method = { - let existing_pm_by_pmid = db.find_payment_method(&payment_method_id).await; + let existing_pm_by_pmid = db + .find_payment_method(&payment_method_id, merchant_account.storage_scheme) + .await; if let Err(err) = existing_pm_by_pmid { if err.current_context().is_db_not_found() { locker_id = Some(payment_method_id.clone()); let existing_pm_by_locker_id = db - .find_payment_method_by_locker_id(&payment_method_id) + .find_payment_method_by_locker_id( + &payment_method_id, + merchant_account.storage_scheme, + ) .await; match &existing_pm_by_locker_id { @@ -208,6 +219,7 @@ pub async fn get_or_insert_payment_method( locker_id, None, None, + merchant_account.storage_scheme, ) .await } else { @@ -366,10 +378,14 @@ pub async fn add_payment_method( payment_method_data: pm_data_encrypted, }; - db.update_payment_method(existing_pm, pm_update) - .await - .change_context(errors::ApiErrorResponse::InternalServerError) - .attach_printable("Failed to add payment method in db")?; + db.update_payment_method( + existing_pm, + pm_update, + merchant_account.storage_scheme, + ) + .await + .change_context(errors::ApiErrorResponse::InternalServerError) + .attach_printable("Failed to add payment method in db")?; } } }, @@ -396,6 +412,7 @@ pub async fn add_payment_method( locker_id, None, None, + merchant_account.storage_scheme, ) .await?; } @@ -417,6 +434,7 @@ pub async fn insert_payment_method( locker_id: Option, connector_mandate_details: Option, network_transaction_id: Option, + storage_scheme: MerchantStorageScheme, ) -> errors::RouterResult { let pm_card_details = resp .card @@ -436,6 +454,7 @@ pub async fn insert_payment_method( key_store, connector_mandate_details, network_transaction_id, + storage_scheme, ) .await } @@ -842,11 +861,12 @@ pub async fn update_payment_method( db: &dyn db::StorageInterface, pm: payment_method::PaymentMethod, pm_metadata: serde_json::Value, + storage_scheme: MerchantStorageScheme, ) -> errors::CustomResult<(), errors::VaultError> { let pm_update = payment_method::PaymentMethodUpdate::MetadataUpdate { metadata: Some(pm_metadata), }; - db.update_payment_method(pm, pm_update) + db.update_payment_method(pm, pm_update, storage_scheme) .await .change_context(errors::VaultError::UpdateInPaymentMethodDataTableFailed)?; Ok(()) @@ -856,12 +876,13 @@ pub async fn update_payment_method_connector_mandate_details( db: &dyn db::StorageInterface, pm: payment_method::PaymentMethod, connector_mandate_details: Option, + storage_scheme: MerchantStorageScheme, ) -> errors::CustomResult<(), errors::VaultError> { let pm_update = payment_method::PaymentMethodUpdate::ConnectorMandateDetailsUpdate { connector_mandate_details, }; - db.update_payment_method(pm, pm_update) + db.update_payment_method(pm, pm_update, storage_scheme) .await .change_context(errors::VaultError::UpdateInPaymentMethodDataTableFailed)?; Ok(()) @@ -3233,6 +3254,7 @@ pub async fn set_default_payment_method( key_store: domain::MerchantKeyStore, customer_id: &str, payment_method_id: String, + storage_scheme: MerchantStorageScheme, ) -> errors::RouterResponse { //check for the customer let customer = db @@ -3241,7 +3263,7 @@ pub async fn set_default_payment_method( .to_not_found_response(errors::ApiErrorResponse::CustomerNotFound)?; // check for the presence of payment_method let payment_method = db - .find_payment_method(&payment_method_id) + .find_payment_method(&payment_method_id, storage_scheme) .await .to_not_found_response(errors::ApiErrorResponse::PaymentMethodNotFound)?; @@ -3291,18 +3313,19 @@ pub async fn set_default_payment_method( pub async fn update_last_used_at( pm_id: &str, state: &routes::AppState, + storage_scheme: MerchantStorageScheme, ) -> errors::RouterResult<()> { let update_last_used = storage::PaymentMethodUpdate::LastUsedUpdate { last_used_at: common_utils::date_time::now(), }; let payment_method = state .store - .find_payment_method(pm_id) + .find_payment_method(pm_id, storage_scheme) .await .to_not_found_response(errors::ApiErrorResponse::PaymentMethodNotFound)?; state .store - .update_payment_method(payment_method, update_last_used) + .update_payment_method(payment_method, update_last_used, storage_scheme) .await .change_context(errors::ApiErrorResponse::InternalServerError) .attach_printable("Failed to update the last_used_at in db")?; @@ -3452,10 +3475,11 @@ pub async fn retrieve_payment_method( state: routes::AppState, pm: api::PaymentMethodId, key_store: domain::MerchantKeyStore, + merchant_account: domain::MerchantAccount, ) -> errors::RouterResponse { let db = state.store.as_ref(); let pm = db - .find_payment_method(&pm.payment_method_id) + .find_payment_method(&pm.payment_method_id, merchant_account.storage_scheme) .await .to_not_found_response(errors::ApiErrorResponse::PaymentMethodNotFound)?; @@ -3510,7 +3534,10 @@ pub async fn delete_payment_method( ) -> errors::RouterResponse { let db = state.store.as_ref(); let key = db - .find_payment_method(pm_id.payment_method_id.as_str()) + .find_payment_method( + pm_id.payment_method_id.as_str(), + merchant_account.storage_scheme, + ) .await .to_not_found_response(errors::ApiErrorResponse::PaymentMethodNotFound)?; diff --git a/crates/router/src/core/payments.rs b/crates/router/src/core/payments.rs index e4fd17fcbe..e74a8c1d1e 100644 --- a/crates/router/src/core/payments.rs +++ b/crates/router/src/core/payments.rs @@ -3561,6 +3561,7 @@ pub async fn payment_external_authentication( &payment_attempt, &payment_intent, &key_store, + storage_scheme, ) .await? .ok_or(errors::ApiErrorResponse::InternalServerError) diff --git a/crates/router/src/core/payments/helpers.rs b/crates/router/src/core/payments/helpers.rs index 624e5e7fea..f8aa463a44 100644 --- a/crates/router/src/core/payments/helpers.rs +++ b/crates/router/src/core/payments/helpers.rs @@ -463,7 +463,7 @@ pub async fn get_token_pm_type_mandate_details( RecurringDetails::PaymentMethodId(payment_method_id) => { let payment_method_info = state .store - .find_payment_method(payment_method_id) + .find_payment_method(payment_method_id, merchant_account.storage_scheme) .await .to_not_found_response( errors::ApiErrorResponse::PaymentMethodNotFound, @@ -597,7 +597,7 @@ pub async fn get_token_for_recurring_mandate( )?; let payment_method = db - .find_payment_method(payment_method_id.as_str()) + .find_payment_method(payment_method_id.as_str(), merchant_account.storage_scheme) .await .to_not_found_response(errors::ApiErrorResponse::PaymentMethodNotFound)?; @@ -1688,6 +1688,8 @@ pub async fn retrieve_card_with_permanent_token( payment_method_id: &str, payment_intent: &PaymentIntent, card_token_data: Option<&CardToken>, + _merchant_key_store: &domain::MerchantKeyStore, + storage_scheme: enums::MerchantStorageScheme, ) -> RouterResult { let customer_id = payment_intent .customer_id @@ -1733,20 +1735,21 @@ pub async fn retrieve_card_with_permanent_token( card_issuing_country: None, bank_code: None, }; - cards::update_last_used_at(payment_method_id, state).await?; + cards::update_last_used_at(payment_method_id, state, storage_scheme).await?; Ok(api::PaymentMethodData::Card(api_card)) } pub async fn retrieve_payment_method_from_db_with_token_data( state: &AppState, token_data: &storage::PaymentTokenData, + storage_scheme: storage::enums::MerchantStorageScheme, ) -> RouterResult> { match token_data { storage::PaymentTokenData::PermanentCard(data) => { if let Some(ref payment_method_id) = data.payment_method_id { state .store - .find_payment_method(payment_method_id) + .find_payment_method(payment_method_id, storage_scheme) .await .to_not_found_response(errors::ApiErrorResponse::PaymentMethodNotFound) .attach_printable("error retrieving payment method from DB") @@ -1758,7 +1761,7 @@ pub async fn retrieve_payment_method_from_db_with_token_data( storage::PaymentTokenData::WalletToken(data) => state .store - .find_payment_method(&data.payment_method_id) + .find_payment_method(&data.payment_method_id, storage_scheme) .await .to_not_found_response(errors::ApiErrorResponse::PaymentMethodNotFound) .attach_printable("error retrieveing payment method from DB") @@ -1827,6 +1830,7 @@ pub async fn make_pm_data<'a, F: Clone, R, Ctx: PaymentMethodRetrieve>( payment_data: &mut PaymentData, merchant_key_store: &domain::MerchantKeyStore, customer: &Option, + storage_scheme: common_enums::enums::MerchantStorageScheme, ) -> RouterResult<( BoxedOperation<'a, F, R, Ctx>, Option, @@ -1859,6 +1863,7 @@ pub async fn make_pm_data<'a, F: Clone, R, Ctx: PaymentMethodRetrieve>( &payment_data.payment_intent, card_token_data.as_ref(), customer, + storage_scheme, ) .await; @@ -4103,6 +4108,7 @@ pub async fn get_payment_method_details_from_payment_token( payment_attempt: &PaymentAttempt, payment_intent: &PaymentIntent, key_store: &domain::MerchantKeyStore, + storage_scheme: enums::MerchantStorageScheme, ) -> RouterResult> { let hyperswitch_token = if let Some(token) = payment_attempt.payment_token.clone() { let redis_conn = state @@ -4184,6 +4190,8 @@ pub async fn get_payment_method_details_from_payment_token( .unwrap_or(&card_token.token), payment_intent, None, + key_store, + storage_scheme, ) .await .map(|card| Some((card, enums::PaymentMethod::Card))), @@ -4197,6 +4205,8 @@ pub async fn get_payment_method_details_from_payment_token( .unwrap_or(&card_token.token), payment_intent, None, + key_store, + storage_scheme, ) .await .map(|card| Some((card, enums::PaymentMethod::Card))), diff --git a/crates/router/src/core/payments/operations.rs b/crates/router/src/core/payments/operations.rs index b9f9618fa6..cad02fc721 100644 --- a/crates/router/src/core/payments/operations.rs +++ b/crates/router/src/core/payments/operations.rs @@ -276,7 +276,7 @@ where &'a self, state: &'a AppState, payment_data: &mut PaymentData, - _storage_scheme: enums::MerchantStorageScheme, + storage_scheme: enums::MerchantStorageScheme, merchant_key_store: &domain::MerchantKeyStore, customer: &Option, ) -> RouterResult<( @@ -290,6 +290,7 @@ where payment_data, merchant_key_store, customer, + storage_scheme, ) .await } diff --git a/crates/router/src/core/payments/operations/payment_complete_authorize.rs b/crates/router/src/core/payments/operations/payment_complete_authorize.rs index ccefb3b2b5..e1a16b6f4a 100644 --- a/crates/router/src/core/payments/operations/payment_complete_authorize.rs +++ b/crates/router/src/core/payments/operations/payment_complete_authorize.rs @@ -349,7 +349,7 @@ impl Domain, - _storage_scheme: storage_enums::MerchantStorageScheme, + storage_scheme: storage_enums::MerchantStorageScheme, merchant_key_store: &domain::MerchantKeyStore, customer: &Option, ) -> RouterResult<( @@ -363,6 +363,7 @@ impl Domain ) .await?; - let payment_method_info = - helpers::retrieve_payment_method_from_db_with_token_data(state, &token_data) - .await?; + let payment_method_info = helpers::retrieve_payment_method_from_db_with_token_data( + state, + &token_data, + storage_scheme, + ) + .await?; (Some(token_data), payment_method_info) } else { @@ -672,7 +675,7 @@ impl Domain, - _storage_scheme: storage_enums::MerchantStorageScheme, + storage_scheme: storage_enums::MerchantStorageScheme, key_store: &domain::MerchantKeyStore, customer: &Option, ) -> RouterResult<( @@ -680,8 +683,15 @@ impl Domain, Option, )> { - let (op, payment_method_data, pm_id) = - helpers::make_pm_data(Box::new(self), state, payment_data, key_store, customer).await?; + let (op, payment_method_data, pm_id) = helpers::make_pm_data( + Box::new(self), + state, + payment_data, + key_store, + customer, + storage_scheme, + ) + .await?; utils::when(payment_method_data.is_none(), || { Err(errors::ApiErrorResponse::PaymentMethodNotFound) diff --git a/crates/router/src/core/payments/operations/payment_create.rs b/crates/router/src/core/payments/operations/payment_create.rs index 2c98052ad3..c6e95b176c 100644 --- a/crates/router/src/core/payments/operations/payment_create.rs +++ b/crates/router/src/core/payments/operations/payment_create.rs @@ -482,7 +482,7 @@ impl Domain, - _storage_scheme: enums::MerchantStorageScheme, + storage_scheme: enums::MerchantStorageScheme, merchant_key_store: &domain::MerchantKeyStore, customer: &Option, ) -> RouterResult<( @@ -496,6 +496,7 @@ impl Domain( &mut payment_data, router_data.status, router_data.response.clone(), + storage_scheme, ) .await?; let m_db = state.clone().store; @@ -965,11 +966,12 @@ async fn update_payment_method_status_and_ntid( payment_data: &mut PaymentData, attempt_status: common_enums::AttemptStatus, payment_response: Result, + storage_scheme: enums::MerchantStorageScheme, ) -> RouterResult<()> { if let Some(id) = &payment_data.payment_attempt.payment_method_id { let pm = state .store - .find_payment_method(id) + .find_payment_method(id, storage_scheme) .await .to_not_found_response(errors::ApiErrorResponse::PaymentMethodNotFound)?; @@ -1036,7 +1038,7 @@ async fn update_payment_method_status_and_ntid( state .store - .update_payment_method(pm, pm_update) + .update_payment_method(pm, pm_update, storage_scheme) .await .change_context(errors::ApiErrorResponse::InternalServerError) .attach_printable("Failed to update payment method in db")?; diff --git a/crates/router/src/core/payments/operations/payment_start.rs b/crates/router/src/core/payments/operations/payment_start.rs index 465a2e5818..560d2014ba 100644 --- a/crates/router/src/core/payments/operations/payment_start.rs +++ b/crates/router/src/core/payments/operations/payment_start.rs @@ -299,7 +299,7 @@ where &'a self, state: &'a AppState, payment_data: &mut PaymentData, - _storage_scheme: storage_enums::MerchantStorageScheme, + storage_scheme: storage_enums::MerchantStorageScheme, merchant_key_store: &domain::MerchantKeyStore, customer: &Option, ) -> RouterResult<( @@ -320,6 +320,7 @@ where payment_data, merchant_key_store, customer, + storage_scheme, ) .await } else { diff --git a/crates/router/src/core/payments/operations/payment_status.rs b/crates/router/src/core/payments/operations/payment_status.rs index f25b89977f..6739c2c3ba 100644 --- a/crates/router/src/core/payments/operations/payment_status.rs +++ b/crates/router/src/core/payments/operations/payment_status.rs @@ -94,7 +94,7 @@ impl Domain, - _storage_scheme: enums::MerchantStorageScheme, + storage_scheme: enums::MerchantStorageScheme, merchant_key_store: &domain::MerchantKeyStore, customer: &Option, ) -> RouterResult<( @@ -108,6 +108,7 @@ impl Domain Some(payment_method), Err(error) => { if error.current_context().is_db_not_found() { diff --git a/crates/router/src/core/payments/operations/payment_update.rs b/crates/router/src/core/payments/operations/payment_update.rs index 76c28307db..934c0edfdd 100644 --- a/crates/router/src/core/payments/operations/payment_update.rs +++ b/crates/router/src/core/payments/operations/payment_update.rs @@ -487,7 +487,7 @@ impl Domain, - _storage_scheme: storage_enums::MerchantStorageScheme, + storage_scheme: storage_enums::MerchantStorageScheme, merchant_key_store: &domain::MerchantKeyStore, customer: &Option, ) -> RouterResult<( @@ -501,6 +501,7 @@ impl Domain match duplication_check { payment_methods::transformers::DataDuplicationCheck::Duplicated => { let payment_method = { - let existing_pm_by_pmid = - db.find_payment_method(&payment_method_id).await; + let existing_pm_by_pmid = db + .find_payment_method( + &payment_method_id, + merchant_account.storage_scheme, + ) + .await; if let Err(err) = existing_pm_by_pmid { if err.current_context().is_db_not_found() { locker_id = Some(payment_method_id.clone()); let existing_pm_by_locker_id = db - .find_payment_method_by_locker_id(&payment_method_id) + .find_payment_method_by_locker_id( + &payment_method_id, + merchant_account.storage_scheme, + ) .await; match &existing_pm_by_locker_id { @@ -257,6 +264,7 @@ where db, pm.clone(), metadata, + merchant_account.storage_scheme, ) .await .change_context( @@ -277,7 +285,7 @@ where ) .await?; - payment_methods::cards::update_payment_method_connector_mandate_details(db, pm, connector_mandate_details).await.change_context( + payment_methods::cards::update_payment_method_connector_mandate_details(db, pm, connector_mandate_details, merchant_account.storage_scheme).await.change_context( errors::ApiErrorResponse::InternalServerError, ) .attach_printable("Failed to update payment method in db")?; @@ -300,6 +308,7 @@ where key_store, connector_mandate_details, network_transaction_id, + merchant_account.storage_scheme, ) .await } else { @@ -315,8 +324,12 @@ where payment_methods::transformers::DataDuplicationCheck::MetaDataChanged => { if let Some(card) = payment_method_create_request.card.clone() { let payment_method = { - let existing_pm_by_pmid = - db.find_payment_method(&payment_method_id).await; + let existing_pm_by_pmid = db + .find_payment_method( + &payment_method_id, + merchant_account.storage_scheme, + ) + .await; if let Err(err) = existing_pm_by_pmid { if err.current_context().is_db_not_found() { @@ -324,6 +337,7 @@ where let existing_pm_by_locker_id = db .find_payment_method_by_locker_id( &payment_method_id, + merchant_account.storage_scheme, ) .await; @@ -362,7 +376,7 @@ where ) .await?; - payment_methods::cards::update_payment_method_connector_mandate_details(db, pm.clone(), connector_mandate_details).await.change_context( + payment_methods::cards::update_payment_method_connector_mandate_details(db, pm.clone(), connector_mandate_details, merchant_account.storage_scheme).await.change_context( errors::ApiErrorResponse::InternalServerError, ) .attach_printable("Failed to update payment method in db")?; @@ -383,6 +397,7 @@ where locker_id, connector_mandate_details, network_transaction_id, + merchant_account.storage_scheme, ) .await } else { @@ -476,10 +491,14 @@ where payment_method_data: pm_data_encrypted, }; - db.update_payment_method(existing_pm, pm_update) - .await - .change_context(errors::ApiErrorResponse::InternalServerError) - .attach_printable("Failed to add payment method in db")?; + db.update_payment_method( + existing_pm, + pm_update, + merchant_account.storage_scheme, + ) + .await + .change_context(errors::ApiErrorResponse::InternalServerError) + .attach_printable("Failed to add payment method in db")?; } } }, @@ -506,6 +525,7 @@ where key_store, connector_mandate_details, network_transaction_id, + merchant_account.storage_scheme, ) .await?; } diff --git a/crates/router/src/core/payouts/helpers.rs b/crates/router/src/core/payouts/helpers.rs index 7de5a95bd8..ef54cedcb8 100644 --- a/crates/router/src/core/payouts/helpers.rs +++ b/crates/router/src/core/payouts/helpers.rs @@ -298,7 +298,9 @@ pub async fn save_payout_data_to_locker( let locker_ref = stored_resp.card_reference.clone(); // Use locker ref as payment_method_id - let existing_pm_by_pmid = db.find_payment_method(&locker_ref).await; + let existing_pm_by_pmid = db + .find_payment_method(&locker_ref, merchant_account.storage_scheme) + .await; match existing_pm_by_pmid { // If found, update locker's metadata [DELETE + INSERT OP], don't insert in payment_method's table @@ -314,7 +316,13 @@ pub async fn save_payout_data_to_locker( // If not found, use locker ref as locker_id Err(err) => { if err.current_context().is_db_not_found() { - match db.find_payment_method_by_locker_id(&locker_ref).await { + match db + .find_payment_method_by_locker_id( + &locker_ref, + merchant_account.storage_scheme, + ) + .await + { // If found, update locker's metadata [DELETE + INSERT OP], don't insert in payment_methods table Ok(pm) => ( false, @@ -480,6 +488,7 @@ pub async fn save_payout_data_to_locker( key_store, None, None, + merchant_account.storage_scheme, ) .await?; } @@ -537,7 +546,7 @@ pub async fn save_payout_data_to_locker( let pm_update = storage::PaymentMethodUpdate::PaymentMethodDataUpdate { payment_method_data: card_details_encrypted, }; - db.update_payment_method(existing_pm, pm_update) + db.update_payment_method(existing_pm, pm_update, merchant_account.storage_scheme) .await .change_context(errors::ApiErrorResponse::InternalServerError) .attach_printable("Failed to add payment method in db")?; diff --git a/crates/router/src/core/pm_auth.rs b/crates/router/src/core/pm_auth.rs index 715a7ae12a..57afa55096 100644 --- a/crates/router/src/core/pm_auth.rs +++ b/crates/router/src/core/pm_auth.rs @@ -5,7 +5,7 @@ use api_models::{ payment_methods::{self, BankAccountAccessCreds}, payments::{AddressDetails, BankDebitBilling, BankDebitData, PaymentMethodData}, }; -use common_enums::PaymentMethodType; +use common_enums::{enums::MerchantStorageScheme, PaymentMethodType}; use hex; pub mod helpers; pub mod transformers; @@ -455,7 +455,13 @@ async fn store_bank_details_in_payment_methods( }; } - store_in_db(update_entries, new_entries, db).await?; + store_in_db( + update_entries, + new_entries, + db, + merchant_account.storage_scheme, + ) + .await?; Ok(()) } @@ -464,15 +470,16 @@ async fn store_in_db( update_entries: Vec<(storage::PaymentMethod, storage::PaymentMethodUpdate)>, new_entries: Vec, db: &dyn StorageInterface, + storage_scheme: MerchantStorageScheme, ) -> RouterResult<()> { let update_entries_futures = update_entries .into_iter() - .map(|(pm, pm_update)| db.update_payment_method(pm, pm_update)) + .map(|(pm, pm_update)| db.update_payment_method(pm, pm_update, storage_scheme)) .collect::>(); let new_entries_futures = new_entries .into_iter() - .map(|pm_new| db.insert_payment_method(pm_new)) + .map(|pm_new| db.insert_payment_method(pm_new, storage_scheme)) .collect::>(); let update_futures = futures::future::join_all(update_entries_futures); diff --git a/crates/router/src/core/webhooks.rs b/crates/router/src/core/webhooks.rs index 031cf28c71..558eed0fef 100644 --- a/crates/router/src/core/webhooks.rs +++ b/crates/router/src/core/webhooks.rs @@ -458,6 +458,7 @@ pub async fn mandates_incoming_webhook_flow( &state, key_store.clone(), updated_mandate.clone(), + merchant_account.storage_scheme, ) .await?, ); diff --git a/crates/router/src/db/address.rs b/crates/router/src/db/address.rs index fc68f0e303..8eb281184f 100644 --- a/crates/router/src/db/address.rs +++ b/crates/router/src/db/address.rs @@ -275,7 +275,7 @@ mod storage { use error_stack::{report, ResultExt}; use redis_interface::HsetnxReply; use router_env::{instrument, tracing}; - use storage_impl::redis::kv_store::{kv_wrapper, KvOperation}; + use storage_impl::redis::kv_store::{kv_wrapper, KvOperation, PartitionKey}; use super::AddressInterface; use crate::{ @@ -335,7 +335,10 @@ mod storage { let address = match storage_scheme { MerchantStorageScheme::PostgresOnly => database_call().await, MerchantStorageScheme::RedisKv => { - let key = format!("mid_{}_pid_{}", merchant_id, payment_id); + let key = PartitionKey::MerchantIdPaymentId { + merchant_id, + payment_id, + }; let field = format!("add_{}", address_id); Box::pin(db_utils::try_redis_get_else_try_database_get( async { @@ -406,7 +409,11 @@ mod storage { .await } MerchantStorageScheme::RedisKv => { - let key = format!("mid_{}_pid_{}", address.merchant_id.clone(), payment_id); + let merchant_id = address.merchant_id.clone(); + let key = PartitionKey::MerchantIdPaymentId { + merchant_id: &merchant_id, + payment_id: &payment_id, + }; let field = format!("add_{}", address.address_id); let updated_address = AddressUpdateInternal::from(address_update.clone()) .create_address(address.clone()); @@ -430,7 +437,7 @@ mod storage { (&field, redis_value), redis_entry, ), - &key, + key, ) .await .change_context(errors::StorageError::KVError)? @@ -475,7 +482,10 @@ mod storage { .await } MerchantStorageScheme::RedisKv => { - let key = format!("mid_{}_pid_{}", merchant_id, payment_id); + let key = PartitionKey::MerchantIdPaymentId { + merchant_id: &merchant_id, + payment_id, + }; let field = format!("add_{}", &address_new.address_id); let created_address = diesel_models::Address { id: Some(0i32), @@ -513,7 +523,7 @@ mod storage { &created_address, redis_entry, ), - &key, + key, ) .await .change_context(errors::StorageError::KVError)? diff --git a/crates/router/src/db/kafka_store.rs b/crates/router/src/db/kafka_store.rs index f0effcbcac..0a56f4c27d 100644 --- a/crates/router/src/db/kafka_store.rs +++ b/crates/router/src/db/kafka_store.rs @@ -1382,9 +1382,10 @@ impl PaymentMethodInterface for KafkaStore { async fn find_payment_method( &self, payment_method_id: &str, + storage_scheme: MerchantStorageScheme, ) -> CustomResult { self.diesel_store - .find_payment_method(payment_method_id) + .find_payment_method(payment_method_id, storage_scheme) .await } @@ -1434,26 +1435,31 @@ impl PaymentMethodInterface for KafkaStore { async fn find_payment_method_by_locker_id( &self, locker_id: &str, + storage_scheme: MerchantStorageScheme, ) -> CustomResult { self.diesel_store - .find_payment_method_by_locker_id(locker_id) + .find_payment_method_by_locker_id(locker_id, storage_scheme) .await } async fn insert_payment_method( &self, m: storage::PaymentMethodNew, + storage_scheme: MerchantStorageScheme, ) -> CustomResult { - self.diesel_store.insert_payment_method(m).await + self.diesel_store + .insert_payment_method(m, storage_scheme) + .await } async fn update_payment_method( &self, payment_method: storage::PaymentMethod, payment_method_update: storage::PaymentMethodUpdate, + storage_scheme: MerchantStorageScheme, ) -> CustomResult { self.diesel_store - .update_payment_method(payment_method, payment_method_update) + .update_payment_method(payment_method, payment_method_update, storage_scheme) .await } diff --git a/crates/router/src/db/payment_method.rs b/crates/router/src/db/payment_method.rs index 188eeeddc1..26581db3bf 100644 --- a/crates/router/src/db/payment_method.rs +++ b/crates/router/src/db/payment_method.rs @@ -1,12 +1,10 @@ use diesel_models::payment_method::PaymentMethodUpdateInternal; -use error_stack::{report, ResultExt}; -use router_env::{instrument, tracing}; +use error_stack::ResultExt; -use super::{MockDb, Store}; +use super::MockDb; use crate::{ - connection, core::errors::{self, CustomResult}, - types::storage, + types::storage::{self as storage_types, enums::MerchantStorageScheme}, }; #[async_trait::async_trait] @@ -14,19 +12,21 @@ pub trait PaymentMethodInterface { async fn find_payment_method( &self, payment_method_id: &str, - ) -> CustomResult; + storage_scheme: MerchantStorageScheme, + ) -> CustomResult; async fn find_payment_method_by_locker_id( &self, locker_id: &str, - ) -> CustomResult; + storage_scheme: MerchantStorageScheme, + ) -> CustomResult; async fn find_payment_method_by_customer_id_merchant_id_list( &self, customer_id: &str, merchant_id: &str, limit: Option, - ) -> CustomResult, errors::StorageError>; + ) -> CustomResult, errors::StorageError>; async fn find_payment_method_by_customer_id_merchant_id_status( &self, @@ -34,7 +34,7 @@ pub trait PaymentMethodInterface { merchant_id: &str, status: common_enums::PaymentMethodStatus, limit: Option, - ) -> CustomResult, errors::StorageError>; + ) -> CustomResult, errors::StorageError>; async fn get_payment_method_count_by_customer_id_merchant_id_status( &self, @@ -45,140 +45,482 @@ pub trait PaymentMethodInterface { async fn insert_payment_method( &self, - payment_method_new: storage::PaymentMethodNew, - ) -> CustomResult; + payment_method_new: storage_types::PaymentMethodNew, + storage_scheme: MerchantStorageScheme, + ) -> CustomResult; async fn update_payment_method( &self, - payment_method: storage::PaymentMethod, - payment_method_update: storage::PaymentMethodUpdate, - ) -> CustomResult; + payment_method: storage_types::PaymentMethod, + payment_method_update: storage_types::PaymentMethodUpdate, + storage_scheme: MerchantStorageScheme, + ) -> CustomResult; async fn delete_payment_method_by_merchant_id_payment_method_id( &self, merchant_id: &str, payment_method_id: &str, - ) -> CustomResult; + ) -> CustomResult; } -#[async_trait::async_trait] -impl PaymentMethodInterface for Store { - #[instrument(skip_all)] - async fn find_payment_method( - &self, - payment_method_id: &str, - ) -> CustomResult { - let conn = connection::pg_connection_read(self).await?; - storage::PaymentMethod::find_by_payment_method_id(&conn, payment_method_id) +#[cfg(feature = "kv_store")] +mod storage { + use common_utils::fallback_reverse_lookup_not_found; + use diesel_models::{kv, PaymentMethodUpdateInternal}; + use error_stack::{report, ResultExt}; + use redis_interface::HsetnxReply; + use router_env::{instrument, tracing}; + use storage_impl::redis::kv_store::{kv_wrapper, KvOperation, PartitionKey}; + + use super::PaymentMethodInterface; + use crate::{ + connection, + core::errors::{self, utils::RedisErrorExt, CustomResult}, + db::reverse_lookup::ReverseLookupInterface, + services::Store, + types::storage::{self as storage_types, enums::MerchantStorageScheme}, + utils::db_utils, + }; + + #[async_trait::async_trait] + impl PaymentMethodInterface for Store { + #[instrument(skip_all)] + async fn find_payment_method( + &self, + payment_method_id: &str, + storage_scheme: MerchantStorageScheme, + ) -> CustomResult { + let conn = connection::pg_connection_read(self).await?; + let database_call = || async { + storage_types::PaymentMethod::find_by_payment_method_id(&conn, payment_method_id) + .await + .map_err(|error| report!(errors::StorageError::from(error))) + }; + + match storage_scheme { + MerchantStorageScheme::PostgresOnly => database_call().await, + MerchantStorageScheme::RedisKv => { + let lookup_id = format!("payment_method_{}", payment_method_id); + let lookup = fallback_reverse_lookup_not_found!( + self.get_lookup_by_lookup_id(&lookup_id, storage_scheme) + .await, + database_call().await + ); + + let key = PartitionKey::CombinationKey { + combination: &lookup.pk_id, + }; + + Box::pin(db_utils::try_redis_get_else_try_database_get( + async { + kv_wrapper( + self, + KvOperation::::HGet(&lookup.sk_id), + key, + ) + .await? + .try_into_hget() + }, + database_call, + )) + .await + } + } + } + + #[instrument(skip_all)] + async fn find_payment_method_by_locker_id( + &self, + locker_id: &str, + storage_scheme: MerchantStorageScheme, + ) -> CustomResult { + let conn = connection::pg_connection_read(self).await?; + let database_call = || async { + storage_types::PaymentMethod::find_by_locker_id(&conn, locker_id) + .await + .map_err(|error| report!(errors::StorageError::from(error))) + }; + + match storage_scheme { + MerchantStorageScheme::PostgresOnly => database_call().await, + MerchantStorageScheme::RedisKv => { + let lookup_id = format!("payment_method_locker_{}", locker_id); + let lookup = fallback_reverse_lookup_not_found!( + self.get_lookup_by_lookup_id(&lookup_id, storage_scheme) + .await, + database_call().await + ); + + let key = PartitionKey::CombinationKey { + combination: &lookup.pk_id, + }; + + Box::pin(db_utils::try_redis_get_else_try_database_get( + async { + kv_wrapper( + self, + KvOperation::::HGet(&lookup.sk_id), + key, + ) + .await? + .try_into_hget() + }, + database_call, + )) + .await + } + } + } + // not supported in kv + #[instrument(skip_all)] + async fn get_payment_method_count_by_customer_id_merchant_id_status( + &self, + customer_id: &str, + merchant_id: &str, + status: common_enums::PaymentMethodStatus, + ) -> CustomResult { + let conn = connection::pg_connection_read(self).await?; + storage_types::PaymentMethod::get_count_by_customer_id_merchant_id_status( + &conn, + customer_id, + merchant_id, + status, + ) .await .map_err(|error| report!(errors::StorageError::from(error))) - } + } - #[instrument(skip_all)] - async fn find_payment_method_by_locker_id( - &self, - locker_id: &str, - ) -> CustomResult { - let conn = connection::pg_connection_read(self).await?; - storage::PaymentMethod::find_by_locker_id(&conn, locker_id) + #[instrument(skip_all)] + async fn insert_payment_method( + &self, + payment_method_new: storage_types::PaymentMethodNew, + storage_scheme: MerchantStorageScheme, + ) -> CustomResult { + match storage_scheme { + MerchantStorageScheme::PostgresOnly => { + let conn = connection::pg_connection_write(self).await?; + payment_method_new + .insert(&conn) + .await + .map_err(|error| report!(errors::StorageError::from(error))) + } + MerchantStorageScheme::RedisKv => { + let merchant_id = payment_method_new.merchant_id.clone(); + let customer_id = payment_method_new.customer_id.clone(); + + let key = PartitionKey::MerchantIdCustomerId { + merchant_id: &merchant_id, + customer_id: &customer_id, + }; + let key_str = key.to_string(); + let field = + format!("payment_method_id_{}", payment_method_new.payment_method_id); + + let reverse_lookup_entry = |v: String| diesel_models::ReverseLookupNew { + sk_id: field.clone(), + pk_id: key_str.clone(), + lookup_id: v, + source: "payment_method".to_string(), + updated_by: storage_scheme.to_string(), + }; + + let lookup_id1 = + format!("payment_method_{}", &payment_method_new.payment_method_id); + let mut reverse_lookups = vec![lookup_id1]; + if let Some(locker_id) = &payment_method_new.locker_id { + reverse_lookups.push(format!("payment_method_locker_{}", locker_id)) + } + + let results = reverse_lookups.into_iter().map(|v| { + self.insert_reverse_lookup(reverse_lookup_entry(v), storage_scheme) + }); + + futures::future::try_join_all(results).await?; + + let storage_payment_method = (&payment_method_new).into(); + + let redis_entry = kv::TypedSql { + op: kv::DBOperation::Insert { + insertable: kv::Insertable::PaymentMethod(payment_method_new), + }, + }; + + match kv_wrapper::( + self, + KvOperation::::HSetNx( + &field, + &storage_payment_method, + redis_entry, + ), + key, + ) + .await + .map_err(|err| err.to_redis_failed_response(&key_str))? + .try_into_hsetnx() + { + Ok(HsetnxReply::KeyNotSet) => Err(errors::StorageError::DuplicateValue { + entity: "payment_method", + key: Some(storage_payment_method.payment_method_id), + } + .into()), + Ok(HsetnxReply::KeySet) => Ok(storage_payment_method), + Err(er) => Err(er).change_context(errors::StorageError::KVError), + } + } + } + } + + #[instrument(skip_all)] + async fn update_payment_method( + &self, + payment_method: storage_types::PaymentMethod, + payment_method_update: storage_types::PaymentMethodUpdate, + storage_scheme: MerchantStorageScheme, + ) -> CustomResult { + match storage_scheme { + MerchantStorageScheme::PostgresOnly => { + let conn = connection::pg_connection_write(self).await?; + payment_method + .update_with_payment_method_id(&conn, payment_method_update.into()) + .await + .map_err(|error| report!(errors::StorageError::from(error))) + } + MerchantStorageScheme::RedisKv => { + let merchant_id = payment_method.merchant_id.clone(); + let customer_id = payment_method.customer_id.clone(); + let key = PartitionKey::MerchantIdCustomerId { + merchant_id: &merchant_id, + customer_id: &customer_id, + }; + let key_str = key.to_string(); + let field = format!("payment_method_id_{}", payment_method.payment_method_id); + + let p_update: PaymentMethodUpdateInternal = payment_method_update.into(); + let updated_payment_method = + p_update.clone().apply_changeset(payment_method.clone()); + + let redis_value = serde_json::to_string(&updated_payment_method) + .change_context(errors::StorageError::SerializationFailed)?; + + let redis_entry = kv::TypedSql { + op: kv::DBOperation::Update { + updatable: kv::Updateable::PaymentMethodUpdate( + kv::PaymentMethodUpdateMems { + orig: payment_method, + update_data: p_update, + }, + ), + }, + }; + + kv_wrapper::<(), _, _>( + self, + KvOperation::::Hset( + (&field, redis_value), + redis_entry, + ), + key, + ) + .await + .map_err(|err| err.to_redis_failed_response(&key_str))? + .try_into_hset() + .change_context(errors::StorageError::KVError)?; + + Ok(updated_payment_method) + } + } + } + + #[instrument(skip_all)] + async fn find_payment_method_by_customer_id_merchant_id_list( + &self, + customer_id: &str, + merchant_id: &str, + limit: Option, + ) -> CustomResult, errors::StorageError> { + let conn = connection::pg_connection_read(self).await?; + storage_types::PaymentMethod::find_by_customer_id_merchant_id( + &conn, + customer_id, + merchant_id, + limit, + ) .await .map_err(|error| report!(errors::StorageError::from(error))) - } + } - #[instrument(skip_all)] - async fn get_payment_method_count_by_customer_id_merchant_id_status( - &self, - customer_id: &str, - merchant_id: &str, - status: common_enums::PaymentMethodStatus, - ) -> CustomResult { - let conn = connection::pg_connection_read(self).await?; - storage::PaymentMethod::get_count_by_customer_id_merchant_id_status( - &conn, - customer_id, - merchant_id, - status, - ) - .await - .map_err(|error| report!(errors::StorageError::from(error))) - } - - #[instrument(skip_all)] - async fn insert_payment_method( - &self, - payment_method_new: storage::PaymentMethodNew, - ) -> CustomResult { - let conn = connection::pg_connection_write(self).await?; - payment_method_new - .insert(&conn) + #[instrument(skip_all)] + async fn find_payment_method_by_customer_id_merchant_id_status( + &self, + customer_id: &str, + merchant_id: &str, + status: common_enums::PaymentMethodStatus, + limit: Option, + ) -> CustomResult, errors::StorageError> { + let conn = connection::pg_connection_read(self).await?; + storage_types::PaymentMethod::find_by_customer_id_merchant_id_status( + &conn, + customer_id, + merchant_id, + status, + limit, + ) .await .map_err(|error| report!(errors::StorageError::from(error))) - } + } - #[instrument(skip_all)] - async fn update_payment_method( - &self, - payment_method: storage::PaymentMethod, - payment_method_update: storage::PaymentMethodUpdate, - ) -> CustomResult { - let conn = connection::pg_connection_write(self).await?; - payment_method - .update_with_payment_method_id(&conn, payment_method_update) + async fn delete_payment_method_by_merchant_id_payment_method_id( + &self, + merchant_id: &str, + payment_method_id: &str, + ) -> CustomResult { + let conn = connection::pg_connection_write(self).await?; + storage_types::PaymentMethod::delete_by_merchant_id_payment_method_id( + &conn, + merchant_id, + payment_method_id, + ) .await .map_err(|error| report!(errors::StorageError::from(error))) + } } +} - #[instrument(skip_all)] - async fn find_payment_method_by_customer_id_merchant_id_list( - &self, - customer_id: &str, - merchant_id: &str, - limit: Option, - ) -> CustomResult, errors::StorageError> { - let conn = connection::pg_connection_read(self).await?; - storage::PaymentMethod::find_by_customer_id_merchant_id( - &conn, - customer_id, - merchant_id, - limit, - ) - .await - .map_err(|error| report!(errors::StorageError::from(error))) - } +#[cfg(not(feature = "kv_store"))] +mod storage { + use error_stack::report; + use router_env::{instrument, tracing}; - #[instrument(skip_all)] - async fn find_payment_method_by_customer_id_merchant_id_status( - &self, - customer_id: &str, - merchant_id: &str, - status: common_enums::PaymentMethodStatus, - limit: Option, - ) -> CustomResult, errors::StorageError> { - let conn = connection::pg_connection_read(self).await?; - storage::PaymentMethod::find_by_customer_id_merchant_id_status( - &conn, - customer_id, - merchant_id, - status, - limit, - ) - .await - .map_err(|error| report!(errors::StorageError::from(error))) - } + use super::PaymentMethodInterface; + use crate::{ + connection, + core::errors::{self, CustomResult}, + services::Store, + types::storage::{self as storage_types, enums::MerchantStorageScheme}, + }; + #[async_trait::async_trait] + impl PaymentMethodInterface for Store { + #[instrument(skip_all)] + async fn find_payment_method( + &self, + payment_method_id: &str, + _storage_scheme: MerchantStorageScheme, + ) -> CustomResult { + let conn = connection::pg_connection_read(self).await?; + storage_types::PaymentMethod::find_by_payment_method_id(&conn, payment_method_id) + .await + .map_err(|error| report!(errors::StorageError::from(error))) + } - async fn delete_payment_method_by_merchant_id_payment_method_id( - &self, - merchant_id: &str, - payment_method_id: &str, - ) -> CustomResult { - let conn = connection::pg_connection_write(self).await?; - storage::PaymentMethod::delete_by_merchant_id_payment_method_id( - &conn, - merchant_id, - payment_method_id, - ) - .await - .map_err(|error| report!(errors::StorageError::from(error))) + #[instrument(skip_all)] + async fn find_payment_method_by_locker_id( + &self, + locker_id: &str, + _storage_scheme: MerchantStorageScheme, + ) -> CustomResult { + let conn = connection::pg_connection_read(self).await?; + storage_types::PaymentMethod::find_by_locker_id(&conn, locker_id) + .await + .map_err(|error| report!(errors::StorageError::from(error))) + } + + #[instrument(skip_all)] + async fn get_payment_method_count_by_customer_id_merchant_id_status( + &self, + customer_id: &str, + merchant_id: &str, + status: common_enums::PaymentMethodStatus, + ) -> CustomResult { + let conn = connection::pg_connection_read(self).await?; + storage_types::PaymentMethod::get_count_by_customer_id_merchant_id_status( + &conn, + customer_id, + merchant_id, + status, + ) + .await + .map_err(|error| report!(errors::StorageError::from(error))) + } + + #[instrument(skip_all)] + async fn insert_payment_method( + &self, + payment_method_new: storage_types::PaymentMethodNew, + _storage_scheme: MerchantStorageScheme, + ) -> CustomResult { + let conn = connection::pg_connection_write(self).await?; + payment_method_new + .insert(&conn) + .await + .map_err(|error| report!(errors::StorageError::from(error))) + } + + #[instrument(skip_all)] + async fn update_payment_method( + &self, + payment_method: storage_types::PaymentMethod, + payment_method_update: storage_types::PaymentMethodUpdate, + _storage_scheme: MerchantStorageScheme, + ) -> CustomResult { + let conn = connection::pg_connection_write(self).await?; + payment_method + .update_with_payment_method_id(&conn, payment_method_update.into()) + .await + .map_err(|error| report!(errors::StorageError::from(error))) + } + + #[instrument(skip_all)] + async fn find_payment_method_by_customer_id_merchant_id_list( + &self, + customer_id: &str, + merchant_id: &str, + limit: Option, + ) -> CustomResult, errors::StorageError> { + let conn = connection::pg_connection_read(self).await?; + storage_types::PaymentMethod::find_by_customer_id_merchant_id( + &conn, + customer_id, + merchant_id, + limit, + ) + .await + .map_err(|error| report!(errors::StorageError::from(error))) + } + + #[instrument(skip_all)] + async fn find_payment_method_by_customer_id_merchant_id_status( + &self, + customer_id: &str, + merchant_id: &str, + status: common_enums::PaymentMethodStatus, + limit: Option, + ) -> CustomResult, errors::StorageError> { + let conn = connection::pg_connection_read(self).await?; + storage_types::PaymentMethod::find_by_customer_id_merchant_id_status( + &conn, + customer_id, + merchant_id, + status, + limit, + ) + .await + .map_err(|error| report!(errors::StorageError::from(error))) + } + + async fn delete_payment_method_by_merchant_id_payment_method_id( + &self, + merchant_id: &str, + payment_method_id: &str, + ) -> CustomResult { + let conn = connection::pg_connection_write(self).await?; + storage_types::PaymentMethod::delete_by_merchant_id_payment_method_id( + &conn, + merchant_id, + payment_method_id, + ) + .await + .map_err(|error| report!(errors::StorageError::from(error))) + } } } @@ -187,7 +529,8 @@ impl PaymentMethodInterface for MockDb { async fn find_payment_method( &self, payment_method_id: &str, - ) -> CustomResult { + _storage_scheme: MerchantStorageScheme, + ) -> CustomResult { let payment_methods = self.payment_methods.lock().await; let payment_method = payment_methods .iter() @@ -206,7 +549,8 @@ impl PaymentMethodInterface for MockDb { async fn find_payment_method_by_locker_id( &self, locker_id: &str, - ) -> CustomResult { + _storage_scheme: MerchantStorageScheme, + ) -> CustomResult { let payment_methods = self.payment_methods.lock().await; let payment_method = payment_methods .iter() @@ -242,11 +586,12 @@ impl PaymentMethodInterface for MockDb { async fn insert_payment_method( &self, - payment_method_new: storage::PaymentMethodNew, - ) -> CustomResult { + payment_method_new: storage_types::PaymentMethodNew, + _storage_scheme: MerchantStorageScheme, + ) -> CustomResult { let mut payment_methods = self.payment_methods.lock().await; - let payment_method = storage::PaymentMethod { + let payment_method = storage_types::PaymentMethod { id: i32::try_from(payment_methods.len()) .change_context(errors::StorageError::MockDbError)?, customer_id: payment_method_new.customer_id, @@ -286,9 +631,9 @@ impl PaymentMethodInterface for MockDb { customer_id: &str, merchant_id: &str, _limit: Option, - ) -> CustomResult, errors::StorageError> { + ) -> CustomResult, errors::StorageError> { let payment_methods = self.payment_methods.lock().await; - let payment_methods_found: Vec = payment_methods + let payment_methods_found: Vec = payment_methods .iter() .filter(|pm| pm.customer_id == customer_id && pm.merchant_id == merchant_id) .cloned() @@ -310,9 +655,9 @@ impl PaymentMethodInterface for MockDb { merchant_id: &str, status: common_enums::PaymentMethodStatus, _limit: Option, - ) -> CustomResult, errors::StorageError> { + ) -> CustomResult, errors::StorageError> { let payment_methods = self.payment_methods.lock().await; - let payment_methods_found: Vec = payment_methods + let payment_methods_found: Vec = payment_methods .iter() .filter(|pm| { pm.customer_id == customer_id @@ -336,7 +681,7 @@ impl PaymentMethodInterface for MockDb { &self, merchant_id: &str, payment_method_id: &str, - ) -> CustomResult { + ) -> CustomResult { let mut payment_methods = self.payment_methods.lock().await; match payment_methods.iter().position(|pm| { pm.merchant_id == merchant_id && pm.payment_method_id == payment_method_id @@ -354,9 +699,10 @@ impl PaymentMethodInterface for MockDb { async fn update_payment_method( &self, - payment_method: storage::PaymentMethod, - payment_method_update: storage::PaymentMethodUpdate, - ) -> CustomResult { + payment_method: storage_types::PaymentMethod, + payment_method_update: storage_types::PaymentMethodUpdate, + _storage_scheme: MerchantStorageScheme, + ) -> CustomResult { let pm_update_res = self .payment_methods .lock() diff --git a/crates/router/src/db/refund.rs b/crates/router/src/db/refund.rs index c62f846951..ca3921d619 100644 --- a/crates/router/src/db/refund.rs +++ b/crates/router/src/db/refund.rs @@ -275,7 +275,7 @@ mod storage { use error_stack::{report, ResultExt}; use redis_interface::HsetnxReply; use router_env::{instrument, tracing}; - use storage_impl::redis::kv_store::{kv_wrapper, KvOperation}; + use storage_impl::redis::kv_store::{kv_wrapper, KvOperation, PartitionKey}; use super::RefundInterface; use crate::{ @@ -315,7 +315,9 @@ mod storage { database_call().await ); - let key = &lookup.pk_id; + let key = PartitionKey::CombinationKey { + combination: &lookup.pk_id, + }; Box::pin(db_utils::try_redis_get_else_try_database_get( async { kv_wrapper( @@ -347,7 +349,13 @@ mod storage { .map_err(|error| report!(errors::StorageError::from(error))) } enums::MerchantStorageScheme::RedisKv => { - let key = format!("mid_{}_pid_{}", new.merchant_id, new.payment_id); + let merchant_id = new.merchant_id.clone(); + let payment_id = new.payment_id.clone(); + let key = PartitionKey::MerchantIdPaymentId { + merchant_id: &merchant_id, + payment_id: &payment_id, + }; + let key_str = key.to_string(); // TODO: need to add an application generated payment attempt id to distinguish between multiple attempts for the same payment id // Check for database presence as well Maybe use a read replica here ? let created_refund = storage_types::Refund { @@ -398,7 +406,7 @@ mod storage { "ref_ref_id_{}_{}", created_refund.merchant_id, created_refund.refund_id ), - pk_id: key.clone(), + pk_id: key_str.clone(), source: "refund".to_string(), updated_by: storage_scheme.to_string(), }, @@ -409,7 +417,7 @@ mod storage { "ref_inter_ref_{}_{}", created_refund.merchant_id, created_refund.internal_reference_id ), - pk_id: key.clone(), + pk_id: key_str.clone(), source: "refund".to_string(), updated_by: storage_scheme.to_string(), }, @@ -424,7 +432,7 @@ mod storage { connector_refund_id, created_refund.connector ), - pk_id: key.clone(), + pk_id: key_str.clone(), source: "refund".to_string(), updated_by: storage_scheme.to_string(), }) @@ -442,10 +450,10 @@ mod storage { &created_refund, redis_entry, ), - &key, + key, ) .await - .map_err(|err| err.to_redis_failed_response(&key))? + .map_err(|err| err.to_redis_failed_response(&key_str))? .try_into_hsetnx() { Ok(HsetnxReply::KeyNotSet) => Err(errors::StorageError::DuplicateValue { @@ -488,7 +496,9 @@ mod storage { database_call().await ); - let key = &lookup.pk_id; + let key = PartitionKey::CombinationKey { + combination: &lookup.pk_id, + }; let pattern = db_utils::generate_hscan_pattern_for_refund(&lookup.sk_id); @@ -524,7 +534,13 @@ mod storage { .map_err(|error| report!(errors::StorageError::from(error))) } enums::MerchantStorageScheme::RedisKv => { - let key = format!("mid_{}_pid_{}", this.merchant_id, this.payment_id); + let merchant_id = this.merchant_id.clone(); + let payment_id = this.payment_id.clone(); + let key = PartitionKey::MerchantIdPaymentId { + merchant_id: &merchant_id, + payment_id: &payment_id, + }; + let key_str = key.to_string(); let field = format!("pa_{}_ref_{}", &this.attempt_id, &this.refund_id); let updated_refund = refund.clone().apply_changeset(this.clone()); @@ -547,10 +563,10 @@ mod storage { (&field, redis_value), redis_entry, ), - &key, + key, ) .await - .map_err(|err| err.to_redis_failed_response(&key))? + .map_err(|err| err.to_redis_failed_response(&key_str))? .try_into_hset() .change_context(errors::StorageError::KVError)?; @@ -582,7 +598,9 @@ mod storage { database_call().await ); - let key = &lookup.pk_id; + let key = PartitionKey::CombinationKey { + combination: &lookup.pk_id, + }; Box::pin(db_utils::try_redis_get_else_try_database_get( async { kv_wrapper( @@ -630,7 +648,9 @@ mod storage { database_call().await ); - let key = &lookup.pk_id; + let key = PartitionKey::CombinationKey { + combination: &lookup.pk_id, + }; Box::pin(db_utils::try_redis_get_else_try_database_get( async { kv_wrapper( @@ -668,7 +688,10 @@ mod storage { match storage_scheme { enums::MerchantStorageScheme::PostgresOnly => database_call().await, enums::MerchantStorageScheme::RedisKv => { - let key = format!("mid_{merchant_id}_pid_{payment_id}"); + let key = PartitionKey::MerchantIdPaymentId { + merchant_id, + payment_id, + }; Box::pin(db_utils::try_redis_get_else_try_database_get( async { kv_wrapper( diff --git a/crates/router/src/db/reverse_lookup.rs b/crates/router/src/db/reverse_lookup.rs index 852d6359c9..121ec06ec1 100644 --- a/crates/router/src/db/reverse_lookup.rs +++ b/crates/router/src/db/reverse_lookup.rs @@ -69,7 +69,7 @@ mod storage { use error_stack::{report, ResultExt}; use redis_interface::SetnxReply; use router_env::{instrument, tracing}; - use storage_impl::redis::kv_store::{kv_wrapper, KvOperation}; + use storage_impl::redis::kv_store::{kv_wrapper, KvOperation, PartitionKey}; use super::{ReverseLookupInterface, Store}; use crate::{ @@ -115,7 +115,12 @@ mod storage { match kv_wrapper::( self, KvOperation::SetNx(&created_rev_lookup, redis_entry), - format!("reverse_lookup_{}", &created_rev_lookup.lookup_id), + PartitionKey::CombinationKey { + combination: &format!( + "reverse_lookup_{}", + &created_rev_lookup.lookup_id + ), + }, ) .await .map_err(|err| err.to_redis_failed_response(&created_rev_lookup.lookup_id))? @@ -153,7 +158,9 @@ mod storage { kv_wrapper( self, KvOperation::::Get, - format!("reverse_lookup_{id}"), + PartitionKey::CombinationKey { + combination: &format!("reverse_lookup_{id}"), + }, ) .await? .try_into_get() diff --git a/crates/router/src/routes/payment_methods.rs b/crates/router/src/routes/payment_methods.rs index 555410a416..6c3d51f659 100644 --- a/crates/router/src/routes/payment_methods.rs +++ b/crates/router/src/routes/payment_methods.rs @@ -201,7 +201,9 @@ pub async fn payment_method_retrieve_api( state, &req, payload, - |state, auth, pm, _| cards::retrieve_payment_method(state, pm, auth.key_store), + |state, auth, pm, _| { + cards::retrieve_payment_method(state, pm, auth.key_store, auth.merchant_account) + }, &auth::ApiKeyAuth, api_locking::LockAction::NotApplicable, )) @@ -319,6 +321,7 @@ pub async fn default_payment_method_set_api( auth.key_store, &customer_id, default_payment_method.payment_method_id, + auth.merchant_account.storage_scheme, ) }, &*ephemeral_auth, diff --git a/crates/router/src/types/api/mandates.rs b/crates/router/src/types/api/mandates.rs index 10c56973dd..dda6dc0cfc 100644 --- a/crates/router/src/types/api/mandates.rs +++ b/crates/router/src/types/api/mandates.rs @@ -28,6 +28,7 @@ pub(crate) trait MandateResponseExt: Sized { state: &AppState, key_store: domain::MerchantKeyStore, mandate: storage::Mandate, + storage_scheme: storage_enums::MerchantStorageScheme, ) -> RouterResult; } @@ -37,10 +38,11 @@ impl MandateResponseExt for MandateResponse { state: &AppState, key_store: domain::MerchantKeyStore, mandate: storage::Mandate, + storage_scheme: storage_enums::MerchantStorageScheme, ) -> RouterResult { let db = &*state.store; let payment_method = db - .find_payment_method(&mandate.payment_method_id) + .find_payment_method(&mandate.payment_method_id, storage_scheme) .await .to_not_found_response(errors::ApiErrorResponse::PaymentMethodNotFound)?; diff --git a/crates/storage_impl/src/lib.rs b/crates/storage_impl/src/lib.rs index c4bbf76833..20d018a6b7 100644 --- a/crates/storage_impl/src/lib.rs +++ b/crates/storage_impl/src/lib.rs @@ -13,6 +13,7 @@ pub mod errors; mod lookup; pub mod metrics; pub mod mock_db; +pub mod payment_method; pub mod payments; #[cfg(feature = "payouts")] pub mod payouts; @@ -361,6 +362,15 @@ impl UniqueConstraints for diesel_models::PayoutAttempt { } } +impl UniqueConstraints for diesel_models::PaymentMethod { + fn unique_constraints(&self) -> Vec { + vec![format!("paymentmethod_{}", self.payment_method_id)] + } + fn table_name(&self) -> &str { + "PaymentMethod" + } +} + #[cfg(not(feature = "payouts"))] impl PayoutAttemptInterface for KVRouterStore {} #[cfg(not(feature = "payouts"))] diff --git a/crates/storage_impl/src/lookup.rs b/crates/storage_impl/src/lookup.rs index 12d91ba5e9..fb8c866443 100644 --- a/crates/storage_impl/src/lookup.rs +++ b/crates/storage_impl/src/lookup.rs @@ -12,7 +12,7 @@ use redis_interface::SetnxReply; use crate::{ diesel_error_to_data_error, errors::RedisErrorExt, - redis::kv_store::{kv_wrapper, KvOperation}, + redis::kv_store::{kv_wrapper, KvOperation, PartitionKey}, utils::{self, try_redis_get_else_try_database_get}, DatabaseStore, KVRouterStore, RouterStore, }; @@ -94,7 +94,9 @@ impl ReverseLookupInterface for KVRouterStore { match kv_wrapper::( self, KvOperation::SetNx(&created_rev_lookup, redis_entry), - format!("reverse_lookup_{}", &created_rev_lookup.lookup_id), + PartitionKey::CombinationKey { + combination: &format!("reverse_lookup_{}", &created_rev_lookup.lookup_id), + }, ) .await .map_err(|err| err.to_redis_failed_response(&created_rev_lookup.lookup_id))? @@ -129,7 +131,9 @@ impl ReverseLookupInterface for KVRouterStore { kv_wrapper( self, KvOperation::::Get, - format!("reverse_lookup_{id}"), + PartitionKey::CombinationKey { + combination: &format!("reverse_lookup_{id}"), + }, ) .await? .try_into_get() diff --git a/crates/storage_impl/src/payment_method.rs b/crates/storage_impl/src/payment_method.rs new file mode 100644 index 0000000000..254505eaae --- /dev/null +++ b/crates/storage_impl/src/payment_method.rs @@ -0,0 +1,5 @@ +use diesel_models::PaymentMethod; + +use crate::redis::kv_store::KvStorePartition; + +impl KvStorePartition for PaymentMethod {} diff --git a/crates/storage_impl/src/payments/payment_attempt.rs b/crates/storage_impl/src/payments/payment_attempt.rs index b8b97628db..138f3658aa 100644 --- a/crates/storage_impl/src/payments/payment_attempt.rs +++ b/crates/storage_impl/src/payments/payment_attempt.rs @@ -31,7 +31,7 @@ use crate::{ diesel_error_to_data_error, errors::RedisErrorExt, lookup::ReverseLookupInterface, - redis::kv_store::{kv_wrapper, KvOperation}, + redis::kv_store::{kv_wrapper, KvOperation, PartitionKey}, utils::{pg_connection_read, pg_connection_write, try_redis_get_else_try_database_get}, DataModelExt, DatabaseStore, KVRouterStore, RouterStore, }; @@ -339,11 +339,13 @@ impl PaymentAttemptInterface for KVRouterStore { } MerchantStorageScheme::RedisKv => { let payment_attempt = payment_attempt.populate_derived_fields(); - let key = format!( - "mid_{}_pid_{}", - payment_attempt.merchant_id, payment_attempt.payment_id - ); - + let merchant_id = payment_attempt.merchant_id.clone(); + let payment_id = payment_attempt.payment_id.clone(); + let key = PartitionKey::MerchantIdPaymentId { + merchant_id: &merchant_id, + payment_id: &payment_id, + }; + let key_str = key.to_string(); let created_attempt = PaymentAttempt { id: Default::default(), payment_id: payment_attempt.payment_id.clone(), @@ -424,7 +426,7 @@ impl PaymentAttemptInterface for KVRouterStore { "pa_{}_{}", &created_attempt.merchant_id, &created_attempt.attempt_id, ), - pk_id: key.clone(), + pk_id: key_str.clone(), sk_id: field.clone(), source: "payment_attempt".to_string(), updated_by: storage_scheme.to_string(), @@ -439,15 +441,15 @@ impl PaymentAttemptInterface for KVRouterStore { &created_attempt.clone().to_storage_model(), redis_entry, ), - &key, + key, ) .await - .map_err(|err| err.to_redis_failed_response(&key))? + .map_err(|err| err.to_redis_failed_response(&key_str))? .try_into_hsetnx() { Ok(HsetnxReply::KeyNotSet) => Err(errors::StorageError::DuplicateValue { entity: "payment attempt", - key: Some(key), + key: Some(key_str), } .into()), Ok(HsetnxReply::KeySet) => Ok(created_attempt), @@ -471,7 +473,11 @@ impl PaymentAttemptInterface for KVRouterStore { .await } MerchantStorageScheme::RedisKv => { - let key = format!("mid_{}_pid_{}", this.merchant_id, this.payment_id); + let key = PartitionKey::MerchantIdPaymentId { + merchant_id: &this.merchant_id, + payment_id: &this.payment_id, + }; + let key_str = key.to_string(); let old_connector_transaction_id = &this.connector_transaction_id; let old_preprocessing_id = &this.preprocessing_step_id; let updated_attempt = PaymentAttempt::from_storage_model( @@ -503,7 +509,7 @@ impl PaymentAttemptInterface for KVRouterStore { (None, Some(connector_transaction_id)) => { add_connector_txn_id_to_reverse_lookup( self, - key.as_str(), + key_str.as_str(), this.merchant_id.as_str(), updated_attempt.attempt_id.as_str(), connector_transaction_id.as_str(), @@ -515,7 +521,7 @@ impl PaymentAttemptInterface for KVRouterStore { if old_connector_transaction_id.ne(connector_transaction_id) { add_connector_txn_id_to_reverse_lookup( self, - key.as_str(), + key_str.as_str(), this.merchant_id.as_str(), updated_attempt.attempt_id.as_str(), connector_transaction_id.as_str(), @@ -531,7 +537,7 @@ impl PaymentAttemptInterface for KVRouterStore { (None, Some(preprocessing_id)) => { add_preprocessing_id_to_reverse_lookup( self, - key.as_str(), + key_str.as_str(), this.merchant_id.as_str(), updated_attempt.attempt_id.as_str(), preprocessing_id.as_str(), @@ -543,7 +549,7 @@ impl PaymentAttemptInterface for KVRouterStore { if old_preprocessing_id.ne(preprocessing_id) { add_preprocessing_id_to_reverse_lookup( self, - key.as_str(), + key_str.as_str(), this.merchant_id.as_str(), updated_attempt.attempt_id.as_str(), preprocessing_id.as_str(), @@ -558,7 +564,7 @@ impl PaymentAttemptInterface for KVRouterStore { kv_wrapper::<(), _, _>( self, KvOperation::Hset::((&field, redis_value), redis_entry), - &key, + key, ) .await .change_context(errors::StorageError::KVError)? @@ -605,7 +611,9 @@ impl PaymentAttemptInterface for KVRouterStore { .await ); - let key = &lookup.pk_id; + let key = PartitionKey::CombinationKey { + combination: &lookup.pk_id, + }; Box::pin(try_redis_get_else_try_database_get( async { @@ -636,7 +644,10 @@ impl PaymentAttemptInterface for KVRouterStore { match storage_scheme { MerchantStorageScheme::PostgresOnly => database_call().await, MerchantStorageScheme::RedisKv => { - let key = format!("mid_{merchant_id}_pid_{payment_id}"); + let key = PartitionKey::MerchantIdPaymentId { + merchant_id, + payment_id, + }; let pattern = "pa_*"; let redis_fut = async { @@ -685,7 +696,10 @@ impl PaymentAttemptInterface for KVRouterStore { match storage_scheme { MerchantStorageScheme::PostgresOnly => database_call().await, MerchantStorageScheme::RedisKv => { - let key = format!("mid_{merchant_id}_pid_{payment_id}"); + let key = PartitionKey::MerchantIdPaymentId { + merchant_id, + payment_id, + }; let pattern = "pa_*"; let redis_fut = async { @@ -750,7 +764,9 @@ impl PaymentAttemptInterface for KVRouterStore { .await ); - let key = &lookup.pk_id; + let key = PartitionKey::CombinationKey { + combination: &lookup.pk_id, + }; Box::pin(try_redis_get_else_try_database_get( async { kv_wrapper( @@ -796,7 +812,10 @@ impl PaymentAttemptInterface for KVRouterStore { .await } MerchantStorageScheme::RedisKv => { - let key = format!("mid_{merchant_id}_pid_{payment_id}"); + let key = PartitionKey::MerchantIdPaymentId { + merchant_id, + payment_id, + }; let field = format!("pa_{attempt_id}"); Box::pin(try_redis_get_else_try_database_get( async { @@ -851,7 +870,9 @@ impl PaymentAttemptInterface for KVRouterStore { .await ); - let key = &lookup.pk_id; + let key = PartitionKey::CombinationKey { + combination: &lookup.pk_id, + }; Box::pin(try_redis_get_else_try_database_get( async { kv_wrapper( @@ -907,7 +928,9 @@ impl PaymentAttemptInterface for KVRouterStore { ) .await ); - let key = &lookup.pk_id; + let key = PartitionKey::CombinationKey { + combination: &lookup.pk_id, + }; Box::pin(try_redis_get_else_try_database_get( async { @@ -952,7 +975,10 @@ impl PaymentAttemptInterface for KVRouterStore { .await } MerchantStorageScheme::RedisKv => { - let key = format!("mid_{merchant_id}_pid_{payment_id}"); + let key = PartitionKey::MerchantIdPaymentId { + merchant_id, + payment_id, + }; Box::pin(try_redis_get_else_try_database_get( async { kv_wrapper(self, KvOperation::::Scan("pa_*"), key) diff --git a/crates/storage_impl/src/payments/payment_intent.rs b/crates/storage_impl/src/payments/payment_intent.rs index 6323c9a985..8934b3ba2c 100644 --- a/crates/storage_impl/src/payments/payment_intent.rs +++ b/crates/storage_impl/src/payments/payment_intent.rs @@ -39,7 +39,7 @@ use crate::connection; use crate::{ diesel_error_to_data_error, errors::RedisErrorExt, - redis::kv_store::{kv_wrapper, KvOperation}, + redis::kv_store::{kv_wrapper, KvOperation, PartitionKey}, utils::{self, pg_connection_read, pg_connection_write}, DataModelExt, DatabaseStore, KVRouterStore, }; @@ -59,7 +59,13 @@ impl PaymentIntentInterface for KVRouterStore { } MerchantStorageScheme::RedisKv => { - let key = format!("mid_{}_pid_{}", new.merchant_id, new.payment_id); + let merchant_id = new.merchant_id.clone(); + let payment_id = new.payment_id.clone(); + let key = PartitionKey::MerchantIdPaymentId { + merchant_id: &merchant_id, + payment_id: &payment_id, + }; + let key_str = key.to_string(); let field = format!("pi_{}", new.payment_id); let created_intent = PaymentIntent { id: 0i32, @@ -119,15 +125,15 @@ impl PaymentIntentInterface for KVRouterStore { &created_intent.clone().to_storage_model(), redis_entry, ), - &key, + key, ) .await - .map_err(|err| err.to_redis_failed_response(&key))? + .map_err(|err| err.to_redis_failed_response(&key_str))? .try_into_hsetnx() { Ok(HsetnxReply::KeyNotSet) => Err(StorageError::DuplicateValue { entity: "payment_intent", - key: Some(key), + key: Some(key_str), } .into()), Ok(HsetnxReply::KeySet) => Ok(created_intent), @@ -151,7 +157,13 @@ impl PaymentIntentInterface for KVRouterStore { .await } MerchantStorageScheme::RedisKv => { - let key = format!("mid_{}_pid_{}", this.merchant_id, this.payment_id); + let merchant_id = this.merchant_id.clone(); + let payment_id = this.payment_id.clone(); + let key = PartitionKey::MerchantIdPaymentId { + merchant_id: &merchant_id, + payment_id: &payment_id, + }; + let key_str = key.to_string(); let field = format!("pi_{}", this.payment_id); let diesel_intent_update = payment_intent_update.to_storage_model(); @@ -180,10 +192,10 @@ impl PaymentIntentInterface for KVRouterStore { kv_wrapper::<(), _, _>( self, KvOperation::::Hset((&field, redis_value), redis_entry), - &key, + key, ) .await - .map_err(|err| err.to_redis_failed_response(&key))? + .map_err(|err| err.to_redis_failed_response(&key_str))? .try_into_hset() .change_context(StorageError::KVError)?; @@ -212,14 +224,17 @@ impl PaymentIntentInterface for KVRouterStore { MerchantStorageScheme::PostgresOnly => database_call().await, MerchantStorageScheme::RedisKv => { - let key = format!("mid_{merchant_id}_pid_{payment_id}"); + let key = PartitionKey::MerchantIdPaymentId { + merchant_id, + payment_id, + }; let field = format!("pi_{payment_id}"); Box::pin(utils::try_redis_get_else_try_database_get( async { kv_wrapper::( self, KvOperation::::HGet(&field), - &key, + key, ) .await? .try_into_hget() diff --git a/crates/storage_impl/src/payouts/payout_attempt.rs b/crates/storage_impl/src/payouts/payout_attempt.rs index 991a11d86f..50334442e6 100644 --- a/crates/storage_impl/src/payouts/payout_attempt.rs +++ b/crates/storage_impl/src/payouts/payout_attempt.rs @@ -29,7 +29,7 @@ use crate::{ diesel_error_to_data_error, errors::RedisErrorExt, lookup::ReverseLookupInterface, - redis::kv_store::{kv_wrapper, KvOperation}, + redis::kv_store::{kv_wrapper, KvOperation, PartitionKey}, utils::{self, pg_connection_read, pg_connection_write}, DataModelExt, DatabaseStore, KVRouterStore, }; @@ -50,10 +50,13 @@ impl PayoutAttemptInterface for KVRouterStore { .await } MerchantStorageScheme::RedisKv => { - let key = format!( - "mid_{}_poa_{}", - new_payout_attempt.merchant_id, new_payout_attempt.payout_id - ); + let merchant_id = new_payout_attempt.merchant_id.clone(); + let payout_attempt_id = new_payout_attempt.payout_id.clone(); + let key = PartitionKey::MerchantIdPayoutAttemptId { + merchant_id: &merchant_id, + payout_attempt_id: &payout_attempt_id, + }; + let key_str = key.to_string(); let now = common_utils::date_time::now(); let created_attempt = PayoutAttempt { payout_attempt_id: new_payout_attempt.payout_attempt_id.clone(), @@ -92,7 +95,7 @@ impl PayoutAttemptInterface for KVRouterStore { "poa_{}_{}", &created_attempt.merchant_id, &created_attempt.payout_attempt_id, ), - pk_id: key.clone(), + pk_id: key_str.clone(), sk_id: field.clone(), source: "payout_attempt".to_string(), updated_by: storage_scheme.to_string(), @@ -107,15 +110,15 @@ impl PayoutAttemptInterface for KVRouterStore { &created_attempt.clone().to_storage_model(), redis_entry, ), - &key, + key, ) .await - .map_err(|err| err.to_redis_failed_response(&key))? + .map_err(|err| err.to_redis_failed_response(&key_str))? .try_into_hsetnx() { Ok(HsetnxReply::KeyNotSet) => Err(errors::StorageError::DuplicateValue { entity: "payout attempt", - key: Some(key), + key: Some(key_str), } .into()), Ok(HsetnxReply::KeySet) => Ok(created_attempt), @@ -140,7 +143,11 @@ impl PayoutAttemptInterface for KVRouterStore { .await } MerchantStorageScheme::RedisKv => { - let key = format!("mid_{}_poa_{}", this.merchant_id, this.payout_id); + let key = PartitionKey::MerchantIdPayoutAttemptId { + merchant_id: &this.merchant_id, + payout_attempt_id: &this.payout_id, + }; + let key_str = key.to_string(); let field = format!("poa_{}", this.payout_attempt_id); let diesel_payout_update = payout_update.to_storage_model(); @@ -169,10 +176,10 @@ impl PayoutAttemptInterface for KVRouterStore { kv_wrapper::<(), _, _>( self, KvOperation::::Hset((&field, redis_value), redis_entry), - &key, + key, ) .await - .map_err(|err| err.to_redis_failed_response(&key))? + .map_err(|err| err.to_redis_failed_response(&key_str))? .try_into_hset() .change_context(errors::StorageError::KVError)?; @@ -211,7 +218,9 @@ impl PayoutAttemptInterface for KVRouterStore { ) .await ); - let key = &lookup.pk_id; + let key = PartitionKey::CombinationKey { + combination: &lookup.pk_id, + }; Box::pin(utils::try_redis_get_else_try_database_get( async { kv_wrapper( diff --git a/crates/storage_impl/src/payouts/payouts.rs b/crates/storage_impl/src/payouts/payouts.rs index 437874729b..0f72aa592b 100644 --- a/crates/storage_impl/src/payouts/payouts.rs +++ b/crates/storage_impl/src/payouts/payouts.rs @@ -38,7 +38,7 @@ use crate::connection; use crate::{ diesel_error_to_data_error, errors::RedisErrorExt, - redis::kv_store::{kv_wrapper, KvOperation}, + redis::kv_store::{kv_wrapper, KvOperation, PartitionKey}, utils::{self, pg_connection_read, pg_connection_write}, DataModelExt, DatabaseStore, KVRouterStore, }; @@ -56,7 +56,13 @@ impl PayoutsInterface for KVRouterStore { self.router_store.insert_payout(new, storage_scheme).await } MerchantStorageScheme::RedisKv => { - let key = format!("mid_{}_po_{}", new.merchant_id, new.payout_id); + let merchant_id = new.merchant_id.clone(); + let payout_id = new.payout_id.clone(); + let key = PartitionKey::MerchantIdPayoutId { + merchant_id: &merchant_id, + payout_id: &payout_id, + }; + let key_str = key.to_string(); let field = format!("po_{}", new.payout_id); let now = common_utils::date_time::now(); let created_payout = Payouts { @@ -95,15 +101,15 @@ impl PayoutsInterface for KVRouterStore { &created_payout.clone().to_storage_model(), redis_entry, ), - &key, + key, ) .await - .map_err(|err| err.to_redis_failed_response(&key))? + .map_err(|err| err.to_redis_failed_response(&key_str))? .try_into_hsetnx() { Ok(HsetnxReply::KeyNotSet) => Err(StorageError::DuplicateValue { entity: "payouts", - key: Some(key), + key: Some(key_str), } .into()), Ok(HsetnxReply::KeySet) => Ok(created_payout), @@ -128,7 +134,11 @@ impl PayoutsInterface for KVRouterStore { .await } MerchantStorageScheme::RedisKv => { - let key = format!("mid_{}_po_{}", this.merchant_id, this.payout_id); + let key = PartitionKey::MerchantIdPayoutId { + merchant_id: &this.merchant_id, + payout_id: &this.payout_id, + }; + let key_str = key.to_string(); let field = format!("po_{}", this.payout_id); let diesel_payout_update = payout_update.to_storage_model(); @@ -155,10 +165,10 @@ impl PayoutsInterface for KVRouterStore { kv_wrapper::<(), _, _>( self, KvOperation::::Hset((&field, redis_value), redis_entry), - &key, + key, ) .await - .map_err(|err| err.to_redis_failed_response(&key))? + .map_err(|err| err.to_redis_failed_response(&key_str))? .try_into_hset() .change_context(StorageError::KVError)?; @@ -186,14 +196,17 @@ impl PayoutsInterface for KVRouterStore { match storage_scheme { MerchantStorageScheme::PostgresOnly => database_call().await, MerchantStorageScheme::RedisKv => { - let key = format!("mid_{merchant_id}_po_{payout_id}"); + let key = PartitionKey::MerchantIdPayoutId { + merchant_id, + payout_id, + }; let field = format!("po_{payout_id}"); Box::pin(utils::try_redis_get_else_try_database_get( async { kv_wrapper::( self, KvOperation::::HGet(&field), - &key, + key, ) .await? .try_into_hget() @@ -234,14 +247,17 @@ impl PayoutsInterface for KVRouterStore { })) } MerchantStorageScheme::RedisKv => { - let key = format!("mid_{merchant_id}_po_{payout_id}"); + let key = PartitionKey::MerchantIdPayoutId { + merchant_id, + payout_id, + }; let field = format!("po_{payout_id}"); Box::pin(utils::try_redis_get_else_try_database_get( async { kv_wrapper::( self, KvOperation::::HGet(&field), - &key, + key, ) .await? .try_into_hget() diff --git a/crates/storage_impl/src/redis/kv_store.rs b/crates/storage_impl/src/redis/kv_store.rs index c7e6efc0b3..2b046ddb41 100644 --- a/crates/storage_impl/src/redis/kv_store.rs +++ b/crates/storage_impl/src/redis/kv_store.rs @@ -25,11 +25,23 @@ pub enum PartitionKey<'a> { merchant_id: &'a str, payment_id: &'a str, }, - MerchantIdPaymentIdCombination { + CombinationKey { combination: &'a str, }, + MerchantIdCustomerId { + merchant_id: &'a str, + customer_id: &'a str, + }, + MerchantIdPayoutId { + merchant_id: &'a str, + payout_id: &'a str, + }, + MerchantIdPayoutAttemptId { + merchant_id: &'a str, + payout_attempt_id: &'a str, + }, } - +// PartitionKey::MerchantIdPaymentId {merchant_id, payment_id} impl<'a> std::fmt::Display for PartitionKey<'a> { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match *self { @@ -37,9 +49,19 @@ impl<'a> std::fmt::Display for PartitionKey<'a> { merchant_id, payment_id, } => f.write_str(&format!("mid_{merchant_id}_pid_{payment_id}")), - PartitionKey::MerchantIdPaymentIdCombination { combination } => { - f.write_str(combination) - } + PartitionKey::CombinationKey { combination } => f.write_str(combination), + PartitionKey::MerchantIdCustomerId { + merchant_id, + customer_id, + } => f.write_str(&format!("mid_{merchant_id}_cust_{customer_id}")), + PartitionKey::MerchantIdPayoutId { + merchant_id, + payout_id, + } => f.write_str(&format!("mid_{merchant_id}_po_{payout_id}")), + PartitionKey::MerchantIdPayoutAttemptId { + merchant_id, + payout_attempt_id, + } => f.write_str(&format!("mid_{merchant_id}_poa_{payout_attempt_id}")), } } } @@ -90,7 +112,7 @@ where pub async fn kv_wrapper<'a, T, D, S>( store: &KVRouterStore, op: KvOperation<'a, S>, - key: impl AsRef, + partition_key: PartitionKey<'a>, ) -> CustomResult, RedisError> where T: de::DeserializeOwned, @@ -99,21 +121,20 @@ where { let redis_conn = store.get_redis_conn()?; - let key = key.as_ref(); + let key = format!("{}", partition_key); + let type_name = std::any::type_name::(); let operation = op.to_string(); let ttl = store.ttl_for_kv; - let partition_key = PartitionKey::MerchantIdPaymentIdCombination { combination: key }; - let result = async { match op { KvOperation::Hset(value, sql) => { logger::debug!(kv_operation= %operation, value = ?value); redis_conn - .set_hash_fields(key, value, Some(ttl.into())) + .set_hash_fields(&key, value, Some(ttl.into())) .await?; store @@ -125,14 +146,14 @@ where KvOperation::HGet(field) => { let result = redis_conn - .get_hash_field_and_deserialize(key, field, type_name) + .get_hash_field_and_deserialize(&key, field, type_name) .await?; Ok(KvResult::HGet(result)) } KvOperation::Scan(pattern) => { let result: Vec = redis_conn - .hscan_and_deserialize(key, pattern, None) + .hscan_and_deserialize(&key, pattern, None) .await .and_then(|result| { if result.is_empty() { @@ -150,7 +171,7 @@ where value.check_for_constraints(&redis_conn).await?; let result = redis_conn - .serialize_and_set_hash_field_if_not_exist(key, field, value, Some(ttl)) + .serialize_and_set_hash_field_if_not_exist(&key, field, value, Some(ttl)) .await?; if matches!(result, redis_interface::HsetnxReply::KeySet) { @@ -167,7 +188,7 @@ where logger::debug!(kv_operation= %operation, value = ?value); let result = redis_conn - .serialize_and_set_key_if_not_exist(key, value, Some(ttl.into())) + .serialize_and_set_key_if_not_exist(&key, value, Some(ttl.into())) .await?; value.check_for_constraints(&redis_conn).await?; @@ -183,7 +204,7 @@ where } KvOperation::Get => { - let result = redis_conn.get_and_deserialize_key(key, type_name).await?; + let result = redis_conn.get_and_deserialize_key(&key, type_name).await?; Ok(KvResult::Get(result)) } }