From 3fa59d4bac01de8fa25e28340a57e578d9980032 Mon Sep 17 00:00:00 2001 From: akshay-97 Date: Mon, 20 May 2024 12:47:05 +0530 Subject: [PATCH] feat: Soft kill kv (#4582) Co-authored-by: Akshay S Co-authored-by: hyperswitch-bot[bot] <148525504+hyperswitch-bot[bot]@users.noreply.github.com> --- config/development.toml | 1 + config/docker_compose.toml | 1 + crates/diesel_models/src/customers.rs | 11 +++ crates/diesel_models/src/mandate.rs | 26 +++++++ crates/diesel_models/src/payment_method.rs | 31 ++++++++ crates/diesel_models/src/schema.rs | 6 ++ crates/router/src/configs/defaults.rs | 5 +- crates/router/src/configs/settings.rs | 15 +++- crates/router/src/core/admin.rs | 7 ++ crates/router/src/core/customers.rs | 1 + .../router/src/core/payment_methods/cards.rs | 1 + crates/router/src/core/payments/helpers.rs | 1 + crates/router/src/core/payouts/helpers.rs | 1 + crates/router/src/core/pm_auth.rs | 1 + crates/router/src/db/address.rs | 31 ++++++-- crates/router/src/db/customers.rs | 39 +++++++--- crates/router/src/db/mandate.rs | 73 +++++++++---------- crates/router/src/db/payment_method.rs | 56 ++++++++++---- crates/router/src/db/refund.rs | 42 +++++++++-- crates/router/src/db/reverse_lookup.rs | 9 ++- crates/router/src/services.rs | 1 + crates/router/src/types/domain/customer.rs | 4 + crates/storage_impl/src/lib.rs | 9 ++- crates/storage_impl/src/lookup.rs | 6 +- crates/storage_impl/src/metrics.rs | 1 + .../src/payments/payment_attempt.rs | 37 ++++++++-- .../src/payments/payment_intent.rs | 40 ++++++---- .../src/payouts/payout_attempt.rs | 22 ++++-- crates/storage_impl/src/payouts/payouts.rs | 24 ++++-- crates/storage_impl/src/redis/kv_store.rs | 66 +++++++++++++++++ .../down.sql | 6 ++ .../up.sql | 6 ++ 32 files changed, 462 insertions(+), 118 deletions(-) create mode 100644 migrations/2024-05-14-092623_add_updated_by_column/down.sql create mode 100644 migrations/2024-05-14-092623_add_updated_by_column/up.sql diff --git a/config/development.toml b/config/development.toml index 80f4d2b8ea..af26d91446 100644 --- a/config/development.toml +++ b/config/development.toml @@ -560,6 +560,7 @@ delay_between_retries_in_milliseconds = 500 [kv_config] ttl = 900 # 15 * 60 seconds +soft_kill = false [frm] enabled = true diff --git a/config/docker_compose.toml b/config/docker_compose.toml index b9b25e4666..c13c436a19 100644 --- a/config/docker_compose.toml +++ b/config/docker_compose.toml @@ -450,6 +450,7 @@ queue_strategy = "Fifo" [kv_config] ttl = 900 # 15 * 60 seconds +soft_kill = false [frm] enabled = true diff --git a/crates/diesel_models/src/customers.rs b/crates/diesel_models/src/customers.rs index fcabc8879b..0d1657136e 100644 --- a/crates/diesel_models/src/customers.rs +++ b/crates/diesel_models/src/customers.rs @@ -1,3 +1,4 @@ +use common_enums::MerchantStorageScheme; use common_utils::pii; use diesel::{AsChangeset, Identifiable, Insertable, Queryable}; use time::PrimitiveDateTime; @@ -21,6 +22,13 @@ pub struct CustomerNew { pub created_at: PrimitiveDateTime, pub modified_at: PrimitiveDateTime, pub address_id: Option, + pub updated_by: Option, +} + +impl CustomerNew { + pub fn update_storage_scheme(&mut self, storage_scheme: MerchantStorageScheme) { + self.updated_by = Some(storage_scheme.to_string()); + } } impl From for Customer { @@ -40,6 +48,7 @@ impl From for Customer { modified_at: customer_new.modified_at, address_id: customer_new.address_id, default_payment_method_id: None, + updated_by: customer_new.updated_by, } } } @@ -61,6 +70,7 @@ pub struct Customer { pub modified_at: PrimitiveDateTime, pub address_id: Option, pub default_payment_method_id: Option, + pub updated_by: Option, } #[derive( @@ -84,6 +94,7 @@ pub struct CustomerUpdateInternal { pub connector_customer: Option, pub address_id: Option, pub default_payment_method_id: Option>, + pub updated_by: Option, } impl CustomerUpdateInternal { diff --git a/crates/diesel_models/src/mandate.rs b/crates/diesel_models/src/mandate.rs index 39c43a4a17..65576b08f2 100644 --- a/crates/diesel_models/src/mandate.rs +++ b/crates/diesel_models/src/mandate.rs @@ -1,3 +1,4 @@ +use common_enums::MerchantStorageScheme; use common_utils::pii; use diesel::{AsChangeset, Identifiable, Insertable, Queryable}; use masking::Secret; @@ -32,6 +33,7 @@ pub struct Mandate { pub connector_mandate_ids: Option, pub original_payment_id: Option, pub merchant_connector_id: Option, + pub updated_by: Option, } #[derive( @@ -69,6 +71,13 @@ pub struct MandateNew { pub connector_mandate_ids: Option, pub original_payment_id: Option, pub merchant_connector_id: Option, + pub updated_by: Option, +} + +impl MandateNew { + pub fn update_storage_scheme(&mut self, storage_scheme: MerchantStorageScheme) { + self.updated_by = Some(storage_scheme.to_string()); + } } #[derive(Debug)] @@ -90,6 +99,17 @@ pub enum MandateUpdate { }, } +impl MandateUpdate { + pub fn convert_to_mandate_update( + self, + storage_scheme: MerchantStorageScheme, + ) -> MandateUpdateInternal { + let mut updated_object = MandateUpdateInternal::from(self); + updated_object.updated_by = Some(storage_scheme.to_string()); + updated_object + } +} + #[derive(Clone, Eq, PartialEq, Copy, Debug, Default, serde::Serialize, serde::Deserialize)] pub struct SingleUseMandate { pub amount: i64, @@ -113,6 +133,7 @@ pub struct MandateUpdateInternal { connector_mandate_id: Option, payment_method_id: Option, original_payment_id: Option, + updated_by: Option, } impl From for MandateUpdateInternal { @@ -125,6 +146,7 @@ impl From for MandateUpdateInternal { connector_mandate_id: None, payment_method_id: None, original_payment_id: None, + updated_by: None, }, MandateUpdate::CaptureAmountUpdate { amount_captured } => Self { mandate_status: None, @@ -133,6 +155,7 @@ impl From for MandateUpdateInternal { connector_mandate_id: None, payment_method_id: None, original_payment_id: None, + updated_by: None, }, MandateUpdate::ConnectorReferenceUpdate { connector_mandate_ids, @@ -165,6 +188,7 @@ impl MandateUpdateInternal { connector_mandate_id, payment_method_id, original_payment_id, + updated_by, } = self; Mandate { @@ -174,6 +198,7 @@ impl MandateUpdateInternal { connector_mandate_id: connector_mandate_id.map_or(source.connector_mandate_id, Some), payment_method_id: payment_method_id.unwrap_or(source.payment_method_id), original_payment_id: original_payment_id.map_or(source.original_payment_id, Some), + updated_by: updated_by.map_or(source.updated_by, Some), ..source } } @@ -208,6 +233,7 @@ impl From<&MandateNew> for Mandate { connector_mandate_ids: mandate_new.connector_mandate_ids.clone(), original_payment_id: mandate_new.original_payment_id.clone(), merchant_connector_id: mandate_new.merchant_connector_id.clone(), + updated_by: mandate_new.updated_by.clone(), } } } diff --git a/crates/diesel_models/src/payment_method.rs b/crates/diesel_models/src/payment_method.rs index e718417431..f4d46b1d28 100644 --- a/crates/diesel_models/src/payment_method.rs +++ b/crates/diesel_models/src/payment_method.rs @@ -1,3 +1,4 @@ +use common_enums::MerchantStorageScheme; use common_utils::pii; use diesel::{AsChangeset, Identifiable, Insertable, Queryable}; use masking::Secret; @@ -41,6 +42,7 @@ pub struct PaymentMethod { pub network_transaction_id: Option, pub client_secret: Option, pub payment_method_billing_address: Option, + pub updated_by: Option, } #[derive( @@ -77,6 +79,13 @@ pub struct PaymentMethodNew { pub network_transaction_id: Option, pub client_secret: Option, pub payment_method_billing_address: Option, + pub updated_by: Option, +} + +impl PaymentMethodNew { + pub fn update_storage_scheme(&mut self, storage_scheme: MerchantStorageScheme) { + self.updated_by = Some(storage_scheme.to_string()); + } } #[derive(Debug, Eq, PartialEq, Deserialize, Serialize)] @@ -116,6 +125,17 @@ pub enum PaymentMethodUpdate { }, } +impl PaymentMethodUpdate { + pub fn convert_to_payment_method_update( + self, + storage_scheme: MerchantStorageScheme, + ) -> PaymentMethodUpdateInternal { + let mut update_internal: PaymentMethodUpdateInternal = self.into(); + update_internal.updated_by = Some(storage_scheme.to_string()); + update_internal + } +} + #[derive( Clone, Debug, Default, AsChangeset, router_derive::DebugAsDisplay, Serialize, Deserialize, )] @@ -129,6 +149,7 @@ pub struct PaymentMethodUpdateInternal { locker_id: Option, payment_method: Option, connector_mandate_details: Option, + updated_by: Option, payment_method_type: Option, payment_method_issuer: Option, } @@ -148,6 +169,7 @@ impl PaymentMethodUpdateInternal { network_transaction_id, status, connector_mandate_details, + updated_by, .. } = self; @@ -160,6 +182,7 @@ impl PaymentMethodUpdateInternal { status: status.unwrap_or(source.status), connector_mandate_details: connector_mandate_details .map_or(source.connector_mandate_details, Some), + updated_by: updated_by.map_or(source.updated_by, Some), ..source } } @@ -177,6 +200,7 @@ impl From for PaymentMethodUpdateInternal { locker_id: None, payment_method: None, connector_mandate_details: None, + updated_by: None, payment_method_issuer: None, payment_method_type: None, }, @@ -191,6 +215,7 @@ impl From for PaymentMethodUpdateInternal { locker_id: None, payment_method: None, connector_mandate_details: None, + updated_by: None, payment_method_issuer: None, payment_method_type: None, }, @@ -203,6 +228,7 @@ impl From for PaymentMethodUpdateInternal { locker_id: None, payment_method: None, connector_mandate_details: None, + updated_by: None, payment_method_issuer: None, payment_method_type: None, }, @@ -218,6 +244,7 @@ impl From for PaymentMethodUpdateInternal { locker_id: None, payment_method: None, connector_mandate_details: None, + updated_by: None, payment_method_issuer: None, payment_method_type: None, }, @@ -230,6 +257,7 @@ impl From for PaymentMethodUpdateInternal { locker_id: None, payment_method: None, connector_mandate_details: None, + updated_by: None, payment_method_issuer: None, payment_method_type: None, }, @@ -249,6 +277,7 @@ impl From for PaymentMethodUpdateInternal { locker_id, payment_method, connector_mandate_details: None, + updated_by: None, payment_method_issuer, payment_method_type, }, @@ -263,6 +292,7 @@ impl From for PaymentMethodUpdateInternal { payment_method: None, connector_mandate_details, network_transaction_id: None, + updated_by: None, payment_method_issuer: None, payment_method_type: None, }, @@ -302,6 +332,7 @@ impl From<&PaymentMethodNew> for PaymentMethod { status: payment_method_new.status, network_transaction_id: payment_method_new.network_transaction_id.clone(), client_secret: payment_method_new.client_secret.clone(), + updated_by: payment_method_new.updated_by.clone(), payment_method_billing_address: payment_method_new .payment_method_billing_address .clone(), diff --git a/crates/diesel_models/src/schema.rs b/crates/diesel_models/src/schema.rs index 61022f2f25..813fdd1f12 100644 --- a/crates/diesel_models/src/schema.rs +++ b/crates/diesel_models/src/schema.rs @@ -295,6 +295,8 @@ diesel::table! { address_id -> Nullable, #[max_length = 64] default_payment_method_id -> Nullable, + #[max_length = 64] + updated_by -> Nullable, } } @@ -590,6 +592,8 @@ diesel::table! { original_payment_id -> Nullable, #[max_length = 32] merchant_connector_id -> Nullable, + #[max_length = 64] + updated_by -> Nullable, } } @@ -933,6 +937,8 @@ diesel::table! { #[max_length = 128] client_secret -> Nullable, payment_method_billing_address -> Nullable, + #[max_length = 64] + updated_by -> Nullable, } } diff --git a/crates/router/src/configs/defaults.rs b/crates/router/src/configs/defaults.rs index 667bc90fe4..213492ac0d 100644 --- a/crates/router/src/configs/defaults.rs +++ b/crates/router/src/configs/defaults.rs @@ -123,7 +123,10 @@ impl Default for super::settings::DrainerSettings { #[cfg(feature = "kv_store")] impl Default for super::settings::KvConfig { fn default() -> Self { - Self { ttl: 900 } + Self { + ttl: 900, + soft_kill: Some(false), + } } } diff --git a/crates/router/src/configs/settings.rs b/crates/router/src/configs/settings.rs index 2dfae47bff..9564a759cc 100644 --- a/crates/router/src/configs/settings.rs +++ b/crates/router/src/configs/settings.rs @@ -18,7 +18,7 @@ use external_services::{ }, }; use hyperswitch_interfaces::secrets_interface::secret_state::{ - SecretState, SecretStateContainer, SecuredSecret, + RawSecret, SecretState, SecretStateContainer, SecuredSecret, }; use masking::Secret; use redis_interface::RedisSettings; @@ -138,6 +138,7 @@ pub struct Frm { #[derive(Debug, Deserialize, Clone)] pub struct KvConfig { pub ttl: u32, + pub soft_kill: Option, } #[derive(Debug, Deserialize, Clone, Default)] @@ -759,6 +760,18 @@ impl Settings { } } +impl Settings { + #[cfg(feature = "kv_store")] + pub fn is_kv_soft_kill_mode(&self) -> bool { + self.kv_config.soft_kill.unwrap_or(false) + } + + #[cfg(not(feature = "kv_store"))] + pub fn is_kv_soft_kill_mode(&self) -> bool { + false + } +} + #[cfg(feature = "payouts")] #[derive(Debug, Deserialize, Clone, Default)] pub struct Payouts { diff --git a/crates/router/src/core/admin.rs b/crates/router/src/core/admin.rs index 288de66de0..0a70d3f058 100644 --- a/crates/router/src/core/admin.rs +++ b/crates/router/src/core/admin.rs @@ -1382,6 +1382,13 @@ pub async fn kv_for_merchant( Ok(merchant_account) } (true, MerchantStorageScheme::PostgresOnly) => { + if state.conf.as_ref().is_kv_soft_kill_mode() { + Err(errors::ApiErrorResponse::InvalidRequestData { + message: "Kv cannot be enabled when application is in soft_kill_mode" + .to_owned(), + })? + } + db.update_merchant( merchant_account, storage::MerchantAccountUpdate::StorageSchemeUpdate { diff --git a/crates/router/src/core/customers.rs b/crates/router/src/core/customers.rs index 3b10b408f9..0f3d60cccd 100644 --- a/crates/router/src/core/customers.rs +++ b/crates/router/src/core/customers.rs @@ -116,6 +116,7 @@ pub async fn create_customer( created_at: common_utils::date_time::now(), modified_at: common_utils::date_time::now(), default_payment_method_id: None, + updated_by: None, }) } .await diff --git a/crates/router/src/core/payment_methods/cards.rs b/crates/router/src/core/payment_methods/cards.rs index f7667baefa..59b1c0f478 100644 --- a/crates/router/src/core/payment_methods/cards.rs +++ b/crates/router/src/core/payment_methods/cards.rs @@ -140,6 +140,7 @@ pub async fn create_payment_method( last_modified: current_time, last_used_at: current_time, payment_method_billing_address, + updated_by: None, }, storage_scheme, ) diff --git a/crates/router/src/core/payments/helpers.rs b/crates/router/src/core/payments/helpers.rs index b4dd06b8ce..2c442c5ce4 100644 --- a/crates/router/src/core/payments/helpers.rs +++ b/crates/router/src/core/payments/helpers.rs @@ -1608,6 +1608,7 @@ pub async fn create_customer_if_not_exist<'a, F: Clone, R>( connector_customer: None, address_id: None, default_payment_method_id: None, + updated_by: None, }, ) } diff --git a/crates/router/src/core/payouts/helpers.rs b/crates/router/src/core/payouts/helpers.rs index e8baba0d12..46c3c8c9aa 100644 --- a/crates/router/src/core/payouts/helpers.rs +++ b/crates/router/src/core/payouts/helpers.rs @@ -620,6 +620,7 @@ pub async fn get_or_create_customer_details( modified_at: common_utils::date_time::now(), address_id: None, default_payment_method_id: None, + updated_by: None, }; Ok(Some( diff --git a/crates/router/src/core/pm_auth.rs b/crates/router/src/core/pm_auth.rs index 8d30a79df1..f184e166e2 100644 --- a/crates/router/src/core/pm_auth.rs +++ b/crates/router/src/core/pm_auth.rs @@ -467,6 +467,7 @@ async fn store_bank_details_in_payment_methods( network_transaction_id: None, client_secret: None, payment_method_billing_address: None, + updated_by: None, }; new_entries.push(pm_new); diff --git a/crates/router/src/db/address.rs b/crates/router/src/db/address.rs index c93e33b147..5a6b2606d0 100644 --- a/crates/router/src/db/address.rs +++ b/crates/router/src/db/address.rs @@ -275,7 +275,9 @@ mod storage { use error_stack::{report, ResultExt}; use redis_interface::HsetnxReply; use router_env::{instrument, tracing}; - use storage_impl::redis::kv_store::{kv_wrapper, KvOperation, PartitionKey}; + use storage_impl::redis::kv_store::{ + decide_storage_scheme, kv_wrapper, KvOperation, Op, PartitionKey, + }; use super::AddressInterface; use crate::{ @@ -332,6 +334,9 @@ mod storage { .await .map_err(|error| report!(errors::StorageError::from(error))) }; + let storage_scheme = + decide_storage_scheme::<_, storage_types::Address>(self, storage_scheme, Op::Find) + .await; let address = match storage_scheme { MerchantStorageScheme::PostgresOnly => database_call().await, MerchantStorageScheme::RedisKv => { @@ -394,6 +399,18 @@ mod storage { let address = Conversion::convert(this) .await .change_context(errors::StorageError::EncryptionError)?; + let merchant_id = address.merchant_id.clone(); + let key = PartitionKey::MerchantIdPaymentId { + merchant_id: &merchant_id, + payment_id: &payment_id, + }; + let field = format!("add_{}", address.address_id); + let storage_scheme = decide_storage_scheme::<_, storage_types::Address>( + self, + storage_scheme, + Op::Update(key.clone(), &field, Some(address.updated_by.as_str())), + ) + .await; match storage_scheme { MerchantStorageScheme::PostgresOnly => { address @@ -409,12 +426,6 @@ mod storage { .await } MerchantStorageScheme::RedisKv => { - let merchant_id = address.merchant_id.clone(); - let key = PartitionKey::MerchantIdPaymentId { - merchant_id: &merchant_id, - payment_id: &payment_id, - }; - let field = format!("add_{}", address.address_id); let updated_address = AddressUpdateInternal::from(address_update.clone()) .create_address(address.clone()); let redis_value = serde_json::to_string(&updated_address) @@ -466,6 +477,12 @@ mod storage { .await .change_context(errors::StorageError::EncryptionError)?; let merchant_id = address_new.merchant_id.clone(); + let storage_scheme = decide_storage_scheme::<_, storage_types::Address>( + self, + storage_scheme, + Op::Insert, + ) + .await; match storage_scheme { MerchantStorageScheme::PostgresOnly => { let conn = connection::pg_connection_write(self).await?; diff --git a/crates/router/src/db/customers.rs b/crates/router/src/db/customers.rs index 00fb7e4a08..5f8dbe69f8 100644 --- a/crates/router/src/db/customers.rs +++ b/crates/router/src/db/customers.rs @@ -75,7 +75,9 @@ mod storage { use futures::future::try_join_all; use masking::PeekInterface; use router_env::{instrument, tracing}; - use storage_impl::redis::kv_store::{kv_wrapper, KvOperation, PartitionKey}; + use storage_impl::redis::kv_store::{ + decide_storage_scheme, kv_wrapper, KvOperation, Op, PartitionKey, + }; use super::CustomerInterface; use crate::{ @@ -116,7 +118,9 @@ mod storage { .await .map_err(|err| report!(errors::StorageError::from(err))) }; - + let storage_scheme = + decide_storage_scheme::<_, diesel_models::Customer>(self, storage_scheme, Op::Find) + .await; let maybe_customer = match storage_scheme { MerchantStorageScheme::PostgresOnly => database_call().await, MerchantStorageScheme::RedisKv => { @@ -184,15 +188,20 @@ mod storage { .await .map_err(|error| report!(errors::StorageError::from(error))) }; - + let key = PartitionKey::MerchantIdCustomerId { + merchant_id: merchant_id.as_str(), + customer_id: customer_id.as_str(), + }; + let field = format!("cust_{}", customer_id); + let storage_scheme = decide_storage_scheme::<_, diesel_models::Customer>( + self, + storage_scheme, + Op::Update(key.clone(), &field, customer.updated_by.as_deref()), + ) + .await; let updated_object = match storage_scheme { MerchantStorageScheme::PostgresOnly => database_call().await, MerchantStorageScheme::RedisKv => { - let key = PartitionKey::MerchantIdCustomerId { - merchant_id: merchant_id.as_str(), - customer_id: customer_id.as_str(), - }; - let field = format!("cust_{}", customer_id); let updated_customer = diesel_models::CustomerUpdateInternal::from(customer_update.clone()) .apply_changeset(customer.clone()); @@ -250,7 +259,9 @@ mod storage { .await .map_err(|error| report!(errors::StorageError::from(error))) }; - + let storage_scheme = + decide_storage_scheme::<_, diesel_models::Customer>(self, storage_scheme, Op::Find) + .await; let customer = match storage_scheme { MerchantStorageScheme::PostgresOnly => database_call().await, MerchantStorageScheme::RedisKv => { @@ -324,11 +335,17 @@ mod storage { ) -> CustomResult { let customer_id = customer_data.customer_id.clone(); let merchant_id = customer_data.merchant_id.clone(); - let new_customer = customer_data + let mut new_customer = customer_data .construct_new() .await .change_context(errors::StorageError::EncryptionError)?; - + let storage_scheme = decide_storage_scheme::<_, diesel_models::Customer>( + self, + storage_scheme, + Op::Insert, + ) + .await; + new_customer.update_storage_scheme(storage_scheme); let create_customer = match storage_scheme { MerchantStorageScheme::PostgresOnly => { let conn = connection::pg_connection_write(self).await?; diff --git a/crates/router/src/db/mandate.rs b/crates/router/src/db/mandate.rs index 1751657e3f..c9995d502c 100644 --- a/crates/router/src/db/mandate.rs +++ b/crates/router/src/db/mandate.rs @@ -57,7 +57,9 @@ mod storage { use error_stack::{report, ResultExt}; use redis_interface::HsetnxReply; use router_env::{instrument, tracing}; - use storage_impl::redis::kv_store::{kv_wrapper, KvOperation, PartitionKey}; + use storage_impl::redis::kv_store::{ + decide_storage_scheme, kv_wrapper, KvOperation, Op, PartitionKey, + }; use super::MandateInterface; use crate::{ @@ -88,7 +90,9 @@ mod storage { .await .map_err(|error| report!(errors::StorageError::from(error))) }; - + let storage_scheme = + decide_storage_scheme::<_, diesel_models::Mandate>(self, storage_scheme, Op::Find) + .await; match storage_scheme { MerchantStorageScheme::PostgresOnly => database_call().await, MerchantStorageScheme::RedisKv => { @@ -132,7 +136,9 @@ mod storage { .await .map_err(|error| report!(errors::StorageError::from(error))) }; - + let storage_scheme = + decide_storage_scheme::<_, diesel_models::Mandate>(self, storage_scheme, Op::Find) + .await; match storage_scheme { MerchantStorageScheme::PostgresOnly => database_call().await, MerchantStorageScheme::RedisKv => { @@ -187,24 +193,29 @@ mod storage { storage_scheme: MerchantStorageScheme, ) -> CustomResult { let conn = connection::pg_connection_write(self).await?; - + let key = PartitionKey::MerchantIdMandateId { + merchant_id, + mandate_id, + }; + let field = format!("mandate_{}", mandate_id); + let storage_scheme = decide_storage_scheme::<_, diesel_models::Mandate>( + self, + storage_scheme, + Op::Update(key.clone(), &field, mandate.updated_by.as_deref()), + ) + .await; match storage_scheme { MerchantStorageScheme::PostgresOnly => { storage_types::Mandate::update_by_merchant_id_mandate_id( &conn, merchant_id, mandate_id, - storage_types::MandateUpdateInternal::from(mandate_update), + mandate_update.convert_to_mandate_update(storage_scheme), ) .await .map_err(|error| report!(errors::StorageError::from(error))) } MerchantStorageScheme::RedisKv => { - let key = PartitionKey::MerchantIdMandateId { - merchant_id, - mandate_id, - }; - let field = format!("mandate_{}", mandate_id); let key_str = key.to_string(); if let diesel_models::MandateUpdate::ConnectorMandateIdUpdate { @@ -223,7 +234,7 @@ mod storage { .await?; } - let m_update = diesel_models::MandateUpdateInternal::from(mandate_update); + let m_update = mandate_update.convert_to_mandate_update(storage_scheme); let updated_mandate = m_update.clone().apply_changeset(mandate.clone()); let redis_value = serde_json::to_string(&updated_mandate) @@ -271,11 +282,17 @@ mod storage { #[instrument(skip_all)] async fn insert_mandate( &self, - mandate: storage_types::MandateNew, + mut mandate: storage_types::MandateNew, storage_scheme: MerchantStorageScheme, ) -> CustomResult { let conn = connection::pg_connection_write(self).await?; - + let storage_scheme = decide_storage_scheme::<_, diesel_models::Mandate>( + self, + storage_scheme, + Op::Insert, + ) + .await; + mandate.update_storage_scheme(storage_scheme); match storage_scheme { MerchantStorageScheme::PostgresOnly => mandate .insert(&conn) @@ -516,32 +533,9 @@ impl MandateInterface for MockDb { .find(|mandate| mandate.merchant_id == merchant_id && mandate.mandate_id == mandate_id) { Some(mandate) => { - match mandate_update { - storage_types::MandateUpdate::StatusUpdate { mandate_status } => { - mandate.mandate_status = mandate_status; - } - storage_types::MandateUpdate::CaptureAmountUpdate { amount_captured } => { - mandate.amount_captured = amount_captured; - } - storage_types::MandateUpdate::ConnectorReferenceUpdate { - connector_mandate_ids, - } => { - mandate.connector_mandate_ids = connector_mandate_ids; - } - - diesel_models::MandateUpdate::ConnectorMandateIdUpdate { - connector_mandate_id, - connector_mandate_ids, - payment_method_id, - original_payment_id, - } => { - mandate.connector_mandate_ids = connector_mandate_ids; - mandate.connector_mandate_id = connector_mandate_id; - mandate.payment_method_id = payment_method_id; - mandate.original_payment_id = original_payment_id - } - } - Ok(mandate.clone()) + let m_update = diesel_models::MandateUpdateInternal::from(mandate_update); + let updated_mandate = m_update.clone().apply_changeset(mandate.clone()); + Ok(updated_mandate) } None => { Err(errors::StorageError::ValueNotFound("mandate not found".to_string()).into()) @@ -634,6 +628,7 @@ impl MandateInterface for MockDb { metadata: mandate_new.metadata, connector_mandate_ids: mandate_new.connector_mandate_ids, merchant_connector_id: mandate_new.merchant_connector_id, + updated_by: mandate_new.updated_by, }; mandates.push(mandate.clone()); Ok(mandate) diff --git a/crates/router/src/db/payment_method.rs b/crates/router/src/db/payment_method.rs index 7c987c38d5..29b3379a29 100644 --- a/crates/router/src/db/payment_method.rs +++ b/crates/router/src/db/payment_method.rs @@ -71,7 +71,9 @@ mod storage { use error_stack::{report, ResultExt}; use redis_interface::HsetnxReply; use router_env::{instrument, tracing}; - use storage_impl::redis::kv_store::{kv_wrapper, KvOperation, PartitionKey}; + use storage_impl::redis::kv_store::{ + decide_storage_scheme, kv_wrapper, KvOperation, Op, PartitionKey, + }; use super::PaymentMethodInterface; use crate::{ @@ -97,7 +99,12 @@ mod storage { .await .map_err(|error| report!(errors::StorageError::from(error))) }; - + let storage_scheme = decide_storage_scheme::<_, storage_types::PaymentMethod>( + self, + storage_scheme, + Op::Find, + ) + .await; match storage_scheme { MerchantStorageScheme::PostgresOnly => database_call().await, MerchantStorageScheme::RedisKv => { @@ -141,7 +148,12 @@ mod storage { .await .map_err(|error| report!(errors::StorageError::from(error))) }; - + let storage_scheme = decide_storage_scheme::<_, storage_types::PaymentMethod>( + self, + storage_scheme, + Op::Find, + ) + .await; match storage_scheme { MerchantStorageScheme::PostgresOnly => database_call().await, MerchantStorageScheme::RedisKv => { @@ -194,9 +206,16 @@ mod storage { #[instrument(skip_all)] async fn insert_payment_method( &self, - payment_method_new: storage_types::PaymentMethodNew, + mut payment_method_new: storage_types::PaymentMethodNew, storage_scheme: MerchantStorageScheme, ) -> CustomResult { + let storage_scheme = decide_storage_scheme::<_, storage_types::PaymentMethod>( + self, + storage_scheme, + Op::Insert, + ) + .await; + payment_method_new.update_storage_scheme(storage_scheme); match storage_scheme { MerchantStorageScheme::PostgresOnly => { let conn = connection::pg_connection_write(self).await?; @@ -278,25 +297,35 @@ mod storage { payment_method_update: storage_types::PaymentMethodUpdate, storage_scheme: MerchantStorageScheme, ) -> CustomResult { + let merchant_id = payment_method.merchant_id.clone(); + let customer_id = payment_method.customer_id.clone(); + let key = PartitionKey::MerchantIdCustomerId { + merchant_id: &merchant_id, + customer_id: &customer_id, + }; + let field = format!("payment_method_id_{}", payment_method.payment_method_id); + let storage_scheme = decide_storage_scheme::<_, storage_types::PaymentMethod>( + self, + storage_scheme, + Op::Update(key.clone(), &field, payment_method.updated_by.as_deref()), + ) + .await; match storage_scheme { MerchantStorageScheme::PostgresOnly => { let conn = connection::pg_connection_write(self).await?; payment_method - .update_with_payment_method_id(&conn, payment_method_update.into()) + .update_with_payment_method_id( + &conn, + payment_method_update.convert_to_payment_method_update(storage_scheme), + ) .await .map_err(|error| report!(errors::StorageError::from(error))) } MerchantStorageScheme::RedisKv => { - let merchant_id = payment_method.merchant_id.clone(); - let customer_id = payment_method.customer_id.clone(); - let key = PartitionKey::MerchantIdCustomerId { - merchant_id: &merchant_id, - customer_id: &customer_id, - }; let key_str = key.to_string(); - let field = format!("payment_method_id_{}", payment_method.payment_method_id); - let p_update: PaymentMethodUpdateInternal = payment_method_update.into(); + let p_update: PaymentMethodUpdateInternal = + payment_method_update.convert_to_payment_method_update(storage_scheme); let updated_payment_method = p_update.clone().apply_changeset(payment_method.clone()); @@ -662,6 +691,7 @@ impl PaymentMethodInterface for MockDb { status: payment_method_new.status, client_secret: payment_method_new.client_secret, network_transaction_id: payment_method_new.network_transaction_id, + updated_by: payment_method_new.updated_by, payment_method_billing_address: payment_method_new.payment_method_billing_address, }; payment_methods.push(payment_method.clone()); diff --git a/crates/router/src/db/refund.rs b/crates/router/src/db/refund.rs index df1130b7af..3b24dd3960 100644 --- a/crates/router/src/db/refund.rs +++ b/crates/router/src/db/refund.rs @@ -275,7 +275,9 @@ mod storage { use error_stack::{report, ResultExt}; use redis_interface::HsetnxReply; use router_env::{instrument, tracing}; - use storage_impl::redis::kv_store::{kv_wrapper, KvOperation, PartitionKey}; + use storage_impl::redis::kv_store::{ + decide_storage_scheme, kv_wrapper, KvOperation, Op, PartitionKey, + }; use super::RefundInterface; use crate::{ @@ -305,6 +307,9 @@ mod storage { .await .map_err(|error| report!(errors::StorageError::from(error))) }; + let storage_scheme = + decide_storage_scheme::<_, storage_types::Refund>(self, storage_scheme, Op::Find) + .await; match storage_scheme { enums::MerchantStorageScheme::PostgresOnly => database_call().await, enums::MerchantStorageScheme::RedisKv => { @@ -341,6 +346,9 @@ mod storage { new: storage_types::RefundNew, storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult { + let storage_scheme = + decide_storage_scheme::<_, storage_types::Refund>(self, storage_scheme, Op::Insert) + .await; match storage_scheme { enums::MerchantStorageScheme::PostgresOnly => { let conn = connection::pg_connection_write(self).await?; @@ -485,6 +493,9 @@ mod storage { .await .map_err(|error| report!(errors::StorageError::from(error))) }; + let storage_scheme = + decide_storage_scheme::<_, storage_types::Refund>(self, storage_scheme, Op::Find) + .await; match storage_scheme { enums::MerchantStorageScheme::PostgresOnly => database_call().await, enums::MerchantStorageScheme::RedisKv => { @@ -526,6 +537,19 @@ mod storage { refund: storage_types::RefundUpdate, storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult { + let merchant_id = this.merchant_id.clone(); + let payment_id = this.payment_id.clone(); + let key = PartitionKey::MerchantIdPaymentId { + merchant_id: &merchant_id, + payment_id: &payment_id, + }; + let field = format!("pa_{}_ref_{}", &this.attempt_id, &this.refund_id); + let storage_scheme = decide_storage_scheme::<_, storage_types::Refund>( + self, + storage_scheme, + Op::Update(key.clone(), &field, Some(&this.updated_by)), + ) + .await; match storage_scheme { enums::MerchantStorageScheme::PostgresOnly => { let conn = connection::pg_connection_write(self).await?; @@ -534,14 +558,7 @@ mod storage { .map_err(|error| report!(errors::StorageError::from(error))) } enums::MerchantStorageScheme::RedisKv => { - let merchant_id = this.merchant_id.clone(); - let payment_id = this.payment_id.clone(); - let key = PartitionKey::MerchantIdPaymentId { - merchant_id: &merchant_id, - payment_id: &payment_id, - }; let key_str = key.to_string(); - let field = format!("pa_{}_ref_{}", &this.attempt_id, &this.refund_id); let updated_refund = refund.clone().apply_changeset(this.clone()); let redis_value = updated_refund @@ -588,6 +605,9 @@ mod storage { .await .map_err(|error| report!(errors::StorageError::from(error))) }; + let storage_scheme = + decide_storage_scheme::<_, storage_types::Refund>(self, storage_scheme, Op::Find) + .await; match storage_scheme { enums::MerchantStorageScheme::PostgresOnly => database_call().await, enums::MerchantStorageScheme::RedisKv => { @@ -637,6 +657,9 @@ mod storage { .await .map_err(|error| report!(errors::StorageError::from(error))) }; + let storage_scheme = + decide_storage_scheme::<_, storage_types::Refund>(self, storage_scheme, Op::Find) + .await; match storage_scheme { enums::MerchantStorageScheme::PostgresOnly => database_call().await, enums::MerchantStorageScheme::RedisKv => { @@ -685,6 +708,9 @@ mod storage { .await .map_err(|error| report!(errors::StorageError::from(error))) }; + let storage_scheme = + decide_storage_scheme::<_, storage_types::Refund>(self, storage_scheme, Op::Find) + .await; match storage_scheme { enums::MerchantStorageScheme::PostgresOnly => database_call().await, enums::MerchantStorageScheme::RedisKv => { diff --git a/crates/router/src/db/reverse_lookup.rs b/crates/router/src/db/reverse_lookup.rs index 121ec06ec1..fcf38ca420 100644 --- a/crates/router/src/db/reverse_lookup.rs +++ b/crates/router/src/db/reverse_lookup.rs @@ -69,7 +69,9 @@ mod storage { use error_stack::{report, ResultExt}; use redis_interface::SetnxReply; use router_env::{instrument, tracing}; - use storage_impl::redis::kv_store::{kv_wrapper, KvOperation, PartitionKey}; + use storage_impl::redis::kv_store::{ + decide_storage_scheme, kv_wrapper, KvOperation, Op, PartitionKey, + }; use super::{ReverseLookupInterface, Store}; use crate::{ @@ -91,6 +93,8 @@ mod storage { new: ReverseLookupNew, storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult { + let storage_scheme = + decide_storage_scheme::<_, ReverseLookup>(self, storage_scheme, Op::Insert).await; match storage_scheme { enums::MerchantStorageScheme::PostgresOnly => { let conn = connection::pg_connection_write(self).await?; @@ -150,7 +154,8 @@ mod storage { .await .map_err(|error| report!(errors::StorageError::from(error))) }; - + let storage_scheme = + decide_storage_scheme::<_, ReverseLookup>(self, storage_scheme, Op::Find).await; match storage_scheme { enums::MerchantStorageScheme::PostgresOnly => database_call().await, enums::MerchantStorageScheme::RedisKv => { diff --git a/crates/router/src/services.rs b/crates/router/src/services.rs index 5af325a46f..6d9d657e10 100644 --- a/crates/router/src/services.rs +++ b/crates/router/src/services.rs @@ -79,6 +79,7 @@ pub async fn get_store( config.drainer.stream_name.clone(), config.drainer.num_partitions, config.kv_config.ttl, + config.kv_config.soft_kill, ); Ok(store) diff --git a/crates/router/src/types/domain/customer.rs b/crates/router/src/types/domain/customer.rs index 139cd10577..0e8ac32b4a 100644 --- a/crates/router/src/types/domain/customer.rs +++ b/crates/router/src/types/domain/customer.rs @@ -23,6 +23,7 @@ pub struct Customer { pub connector_customer: Option, pub address_id: Option, pub default_payment_method_id: Option, + pub updated_by: Option, } #[async_trait::async_trait] @@ -47,6 +48,7 @@ impl super::behaviour::Conversion for Customer { connector_customer: self.connector_customer, address_id: self.address_id, default_payment_method_id: self.default_payment_method_id, + updated_by: self.updated_by, }) } @@ -75,6 +77,7 @@ impl super::behaviour::Conversion for Customer { connector_customer: item.connector_customer, address_id: item.address_id, default_payment_method_id: item.default_payment_method_id, + updated_by: item.updated_by, }) } .await @@ -98,6 +101,7 @@ impl super::behaviour::Conversion for Customer { modified_at: now, connector_customer: self.connector_customer, address_id: self.address_id, + updated_by: self.updated_by, }) } } diff --git a/crates/storage_impl/src/lib.rs b/crates/storage_impl/src/lib.rs index 7a3ef30bef..0963cd2b41 100644 --- a/crates/storage_impl/src/lib.rs +++ b/crates/storage_impl/src/lib.rs @@ -151,6 +151,7 @@ pub struct KVRouterStore { drainer_num_partitions: u8, ttl_for_kv: u32, pub request_id: Option, + soft_kill_mode: bool, } #[async_trait::async_trait] @@ -159,14 +160,16 @@ where RouterStore: DatabaseStore, T: DatabaseStore, { - type Config = (RouterStore, String, u8, u32); + type Config = (RouterStore, String, u8, u32, Option); async fn new(config: Self::Config, _test_transaction: bool) -> StorageResult { - let (router_store, drainer_stream_name, drainer_num_partitions, ttl_for_kv) = config; + let (router_store, drainer_stream_name, drainer_num_partitions, ttl_for_kv, soft_kill_mode) = + config; Ok(Self::from_store( router_store, drainer_stream_name, drainer_num_partitions, ttl_for_kv, + soft_kill_mode, )) } fn get_master_pool(&self) -> &PgPool { @@ -191,6 +194,7 @@ impl KVRouterStore { drainer_stream_name: String, drainer_num_partitions: u8, ttl_for_kv: u32, + soft_kill: Option, ) -> Self { let request_id = store.request_id.clone(); @@ -200,6 +204,7 @@ impl KVRouterStore { drainer_num_partitions, ttl_for_kv, request_id, + soft_kill_mode: soft_kill.unwrap_or(false), } } diff --git a/crates/storage_impl/src/lookup.rs b/crates/storage_impl/src/lookup.rs index f8daabf2e9..67b8635aba 100644 --- a/crates/storage_impl/src/lookup.rs +++ b/crates/storage_impl/src/lookup.rs @@ -12,7 +12,7 @@ use redis_interface::SetnxReply; use crate::{ diesel_error_to_data_error, errors::RedisErrorExt, - redis::kv_store::{kv_wrapper, KvOperation, PartitionKey}, + redis::kv_store::{decide_storage_scheme, kv_wrapper, KvOperation, Op, PartitionKey}, utils::{self, try_redis_get_else_try_database_get}, DatabaseStore, KVRouterStore, RouterStore, }; @@ -71,6 +71,8 @@ impl ReverseLookupInterface for KVRouterStore { new: DieselReverseLookupNew, storage_scheme: storage_enums::MerchantStorageScheme, ) -> CustomResult { + let storage_scheme = + decide_storage_scheme::<_, DieselReverseLookup>(self, storage_scheme, Op::Insert).await; match storage_scheme { storage_enums::MerchantStorageScheme::PostgresOnly => { self.router_store @@ -124,6 +126,8 @@ impl ReverseLookupInterface for KVRouterStore { .get_lookup_by_lookup_id(id, storage_scheme) .await }; + let storage_scheme = + decide_storage_scheme::<_, DieselReverseLookup>(self, storage_scheme, Op::Find).await; match storage_scheme { storage_enums::MerchantStorageScheme::PostgresOnly => database_call().await, storage_enums::MerchantStorageScheme::RedisKv => { diff --git a/crates/storage_impl/src/metrics.rs b/crates/storage_impl/src/metrics.rs index 29bca2a007..2f22d57813 100644 --- a/crates/storage_impl/src/metrics.rs +++ b/crates/storage_impl/src/metrics.rs @@ -10,3 +10,4 @@ counter_metric!(KV_OPERATION_SUCCESSFUL, GLOBAL_METER); counter_metric!(KV_OPERATION_FAILED, GLOBAL_METER); counter_metric!(KV_PUSHED_TO_DRAINER, GLOBAL_METER); counter_metric!(KV_FAILED_TO_PUSH_TO_DRAINER, GLOBAL_METER); +counter_metric!(KV_SOFT_KILL_ACTIVE_UPDATE, GLOBAL_METER); diff --git a/crates/storage_impl/src/payments/payment_attempt.rs b/crates/storage_impl/src/payments/payment_attempt.rs index d4709648c4..e85e0a131a 100644 --- a/crates/storage_impl/src/payments/payment_attempt.rs +++ b/crates/storage_impl/src/payments/payment_attempt.rs @@ -31,7 +31,7 @@ use crate::{ diesel_error_to_data_error, errors::RedisErrorExt, lookup::ReverseLookupInterface, - redis::kv_store::{kv_wrapper, KvOperation, PartitionKey}, + redis::kv_store::{decide_storage_scheme, kv_wrapper, KvOperation, Op, PartitionKey}, utils::{pg_connection_read, pg_connection_write, try_redis_get_else_try_database_get}, DataModelExt, DatabaseStore, KVRouterStore, RouterStore, }; @@ -333,6 +333,9 @@ impl PaymentAttemptInterface for KVRouterStore { payment_attempt: PaymentAttemptNew, storage_scheme: MerchantStorageScheme, ) -> error_stack::Result { + let storage_scheme = + decide_storage_scheme::<_, DieselPaymentAttempt>(self, storage_scheme, Op::Insert) + .await; match storage_scheme { MerchantStorageScheme::PostgresOnly => { self.router_store @@ -470,6 +473,17 @@ impl PaymentAttemptInterface for KVRouterStore { payment_attempt: PaymentAttemptUpdate, storage_scheme: MerchantStorageScheme, ) -> error_stack::Result { + let key = PartitionKey::MerchantIdPaymentId { + merchant_id: &this.merchant_id, + payment_id: &this.payment_id, + }; + let field = format!("pa_{}", this.attempt_id); + let storage_scheme = decide_storage_scheme::<_, DieselPaymentAttempt>( + self, + storage_scheme, + Op::Update(key.clone(), &field, Some(&this.updated_by)), + ) + .await; match storage_scheme { MerchantStorageScheme::PostgresOnly => { self.router_store @@ -477,10 +491,6 @@ impl PaymentAttemptInterface for KVRouterStore { .await } MerchantStorageScheme::RedisKv => { - let key = PartitionKey::MerchantIdPaymentId { - merchant_id: &this.merchant_id, - payment_id: &this.payment_id, - }; let key_str = key.to_string(); let old_connector_transaction_id = &this.connector_transaction_id; let old_preprocessing_id = &this.preprocessing_step_id; @@ -493,7 +503,6 @@ impl PaymentAttemptInterface for KVRouterStore { // Check for database presence as well Maybe use a read replica here ? let redis_value = serde_json::to_string(&updated_attempt) .change_context(errors::StorageError::KVError)?; - let field = format!("pa_{}", updated_attempt.attempt_id); let redis_entry = kv::TypedSql { op: kv::DBOperation::Update { @@ -588,6 +597,8 @@ impl PaymentAttemptInterface for KVRouterStore { merchant_id: &str, storage_scheme: MerchantStorageScheme, ) -> error_stack::Result { + let storage_scheme = + decide_storage_scheme::<_, DieselPaymentAttempt>(self, storage_scheme, Op::Find).await; match storage_scheme { MerchantStorageScheme::PostgresOnly => { self.router_store @@ -645,6 +656,8 @@ impl PaymentAttemptInterface for KVRouterStore { storage_scheme, ) }; + let storage_scheme = + decide_storage_scheme::<_, DieselPaymentAttempt>(self, storage_scheme, Op::Find).await; match storage_scheme { MerchantStorageScheme::PostgresOnly => database_call().await, MerchantStorageScheme::RedisKv => { @@ -697,6 +710,8 @@ impl PaymentAttemptInterface for KVRouterStore { storage_scheme, ) }; + let storage_scheme = + decide_storage_scheme::<_, DieselPaymentAttempt>(self, storage_scheme, Op::Find).await; match storage_scheme { MerchantStorageScheme::PostgresOnly => database_call().await, MerchantStorageScheme::RedisKv => { @@ -744,6 +759,8 @@ impl PaymentAttemptInterface for KVRouterStore { connector_txn_id: &str, storage_scheme: MerchantStorageScheme, ) -> error_stack::Result { + let storage_scheme = + decide_storage_scheme::<_, DieselPaymentAttempt>(self, storage_scheme, Op::Find).await; match storage_scheme { MerchantStorageScheme::PostgresOnly => { self.router_store @@ -804,6 +821,8 @@ impl PaymentAttemptInterface for KVRouterStore { attempt_id: &str, storage_scheme: MerchantStorageScheme, ) -> error_stack::Result { + let storage_scheme = + decide_storage_scheme::<_, DieselPaymentAttempt>(self, storage_scheme, Op::Find).await; match storage_scheme { MerchantStorageScheme::PostgresOnly => { self.router_store @@ -850,6 +869,8 @@ impl PaymentAttemptInterface for KVRouterStore { merchant_id: &str, storage_scheme: MerchantStorageScheme, ) -> error_stack::Result { + let storage_scheme = + decide_storage_scheme::<_, DieselPaymentAttempt>(self, storage_scheme, Op::Find).await; match storage_scheme { MerchantStorageScheme::PostgresOnly => { self.router_store @@ -909,6 +930,8 @@ impl PaymentAttemptInterface for KVRouterStore { merchant_id: &str, storage_scheme: MerchantStorageScheme, ) -> error_stack::Result { + let storage_scheme = + decide_storage_scheme::<_, DieselPaymentAttempt>(self, storage_scheme, Op::Find).await; match storage_scheme { MerchantStorageScheme::PostgresOnly => { self.router_store @@ -968,6 +991,8 @@ impl PaymentAttemptInterface for KVRouterStore { payment_id: &str, storage_scheme: MerchantStorageScheme, ) -> error_stack::Result, errors::StorageError> { + let storage_scheme = + decide_storage_scheme::<_, DieselPaymentAttempt>(self, storage_scheme, Op::Find).await; match storage_scheme { MerchantStorageScheme::PostgresOnly => { self.router_store diff --git a/crates/storage_impl/src/payments/payment_intent.rs b/crates/storage_impl/src/payments/payment_intent.rs index c7592e4821..b3cfd8060c 100644 --- a/crates/storage_impl/src/payments/payment_intent.rs +++ b/crates/storage_impl/src/payments/payment_intent.rs @@ -43,7 +43,7 @@ use crate::connection; use crate::{ diesel_error_to_data_error, errors::RedisErrorExt, - redis::kv_store::{kv_wrapper, KvOperation, PartitionKey}, + redis::kv_store::{decide_storage_scheme, kv_wrapper, KvOperation, Op, PartitionKey}, utils::{self, pg_connection_read, pg_connection_write}, DataModelExt, DatabaseStore, KVRouterStore, }; @@ -55,6 +55,15 @@ impl PaymentIntentInterface for KVRouterStore { new: PaymentIntentNew, storage_scheme: MerchantStorageScheme, ) -> error_stack::Result { + let merchant_id = new.merchant_id.clone(); + let payment_id = new.payment_id.clone(); + let field = format!("pi_{}", new.payment_id); + let key = PartitionKey::MerchantIdPaymentId { + merchant_id: &merchant_id, + payment_id: &payment_id, + }; + let storage_scheme = + decide_storage_scheme::<_, DieselPaymentIntent>(self, storage_scheme, Op::Insert).await; match storage_scheme { MerchantStorageScheme::PostgresOnly => { self.router_store @@ -63,14 +72,7 @@ impl PaymentIntentInterface for KVRouterStore { } MerchantStorageScheme::RedisKv => { - let merchant_id = new.merchant_id.clone(); - let payment_id = new.payment_id.clone(); - let key = PartitionKey::MerchantIdPaymentId { - merchant_id: &merchant_id, - payment_id: &payment_id, - }; let key_str = key.to_string(); - let field = format!("pi_{}", new.payment_id); let created_intent = PaymentIntent { id: 0i32, payment_id: new.payment_id.clone(), @@ -155,6 +157,19 @@ impl PaymentIntentInterface for KVRouterStore { payment_intent_update: PaymentIntentUpdate, storage_scheme: MerchantStorageScheme, ) -> error_stack::Result { + let merchant_id = this.merchant_id.clone(); + let payment_id = this.payment_id.clone(); + let key = PartitionKey::MerchantIdPaymentId { + merchant_id: &merchant_id, + payment_id: &payment_id, + }; + let field = format!("pi_{}", this.payment_id); + let storage_scheme = decide_storage_scheme::<_, DieselPaymentIntent>( + self, + storage_scheme, + Op::Update(key.clone(), &field, Some(&this.updated_by)), + ) + .await; match storage_scheme { MerchantStorageScheme::PostgresOnly => { self.router_store @@ -162,14 +177,7 @@ impl PaymentIntentInterface for KVRouterStore { .await } MerchantStorageScheme::RedisKv => { - let merchant_id = this.merchant_id.clone(); - let payment_id = this.payment_id.clone(); - let key = PartitionKey::MerchantIdPaymentId { - merchant_id: &merchant_id, - payment_id: &payment_id, - }; let key_str = key.to_string(); - let field = format!("pi_{}", this.payment_id); let diesel_intent_update = payment_intent_update.to_storage_model(); let origin_diesel_intent = this.to_storage_model(); @@ -225,6 +233,8 @@ impl PaymentIntentInterface for KVRouterStore { er.change_context(new_err) }) }; + let storage_scheme = + decide_storage_scheme::<_, DieselPaymentIntent>(self, storage_scheme, Op::Find).await; match storage_scheme { MerchantStorageScheme::PostgresOnly => database_call().await, diff --git a/crates/storage_impl/src/payouts/payout_attempt.rs b/crates/storage_impl/src/payouts/payout_attempt.rs index 6a62832e1b..91c1669db3 100644 --- a/crates/storage_impl/src/payouts/payout_attempt.rs +++ b/crates/storage_impl/src/payouts/payout_attempt.rs @@ -29,7 +29,7 @@ use crate::{ diesel_error_to_data_error, errors::RedisErrorExt, lookup::ReverseLookupInterface, - redis::kv_store::{kv_wrapper, KvOperation, PartitionKey}, + redis::kv_store::{decide_storage_scheme, kv_wrapper, KvOperation, Op, PartitionKey}, utils::{self, pg_connection_read, pg_connection_write}, DataModelExt, DatabaseStore, KVRouterStore, }; @@ -43,6 +43,8 @@ impl PayoutAttemptInterface for KVRouterStore { payouts: &Payouts, storage_scheme: MerchantStorageScheme, ) -> error_stack::Result { + let storage_scheme = + decide_storage_scheme::<_, DieselPayoutAttempt>(self, storage_scheme, Op::Insert).await; match storage_scheme { MerchantStorageScheme::PostgresOnly => { self.router_store @@ -136,6 +138,17 @@ impl PayoutAttemptInterface for KVRouterStore { payouts: &Payouts, storage_scheme: MerchantStorageScheme, ) -> error_stack::Result { + let key = PartitionKey::MerchantIdPayoutAttemptId { + merchant_id: &this.merchant_id, + payout_attempt_id: &this.payout_id, + }; + let field = format!("poa_{}", this.payout_attempt_id); + let storage_scheme = decide_storage_scheme::<_, DieselPayoutAttempt>( + self, + storage_scheme, + Op::Update(key.clone(), &field, None), + ) + .await; match storage_scheme { MerchantStorageScheme::PostgresOnly => { self.router_store @@ -143,12 +156,7 @@ impl PayoutAttemptInterface for KVRouterStore { .await } MerchantStorageScheme::RedisKv => { - let key = PartitionKey::MerchantIdPayoutAttemptId { - merchant_id: &this.merchant_id, - payout_attempt_id: &this.payout_id, - }; let key_str = key.to_string(); - let field = format!("poa_{}", this.payout_attempt_id); let diesel_payout_update = payout_update.to_storage_model(); let origin_diesel_payout = this.clone().to_storage_model(); @@ -195,6 +203,8 @@ impl PayoutAttemptInterface for KVRouterStore { payout_attempt_id: &str, storage_scheme: MerchantStorageScheme, ) -> error_stack::Result { + let storage_scheme = + decide_storage_scheme::<_, DieselPayoutAttempt>(self, storage_scheme, Op::Find).await; match storage_scheme { MerchantStorageScheme::PostgresOnly => { self.router_store diff --git a/crates/storage_impl/src/payouts/payouts.rs b/crates/storage_impl/src/payouts/payouts.rs index 19921121a9..92835dee59 100644 --- a/crates/storage_impl/src/payouts/payouts.rs +++ b/crates/storage_impl/src/payouts/payouts.rs @@ -38,7 +38,7 @@ use crate::connection; use crate::{ diesel_error_to_data_error, errors::RedisErrorExt, - redis::kv_store::{kv_wrapper, KvOperation, PartitionKey}, + redis::kv_store::{decide_storage_scheme, kv_wrapper, KvOperation, Op, PartitionKey}, utils::{self, pg_connection_read, pg_connection_write}, DataModelExt, DatabaseStore, KVRouterStore, }; @@ -51,6 +51,8 @@ impl PayoutsInterface for KVRouterStore { new: PayoutsNew, storage_scheme: MerchantStorageScheme, ) -> error_stack::Result { + let storage_scheme = + decide_storage_scheme::<_, DieselPayouts>(self, storage_scheme, Op::Insert).await; match storage_scheme { MerchantStorageScheme::PostgresOnly => { self.router_store.insert_payout(new, storage_scheme).await @@ -128,6 +130,17 @@ impl PayoutsInterface for KVRouterStore { payout_attempt: &PayoutAttempt, storage_scheme: MerchantStorageScheme, ) -> error_stack::Result { + let key = PartitionKey::MerchantIdPayoutId { + merchant_id: &this.merchant_id, + payout_id: &this.payout_id, + }; + let field = format!("po_{}", this.payout_id); + let storage_scheme = decide_storage_scheme::<_, DieselPayouts>( + self, + storage_scheme, + Op::Update(key.clone(), &field, None), + ) + .await; match storage_scheme { MerchantStorageScheme::PostgresOnly => { self.router_store @@ -135,12 +148,7 @@ impl PayoutsInterface for KVRouterStore { .await } MerchantStorageScheme::RedisKv => { - let key = PartitionKey::MerchantIdPayoutId { - merchant_id: &this.merchant_id, - payout_id: &this.payout_id, - }; let key_str = key.to_string(); - let field = format!("po_{}", this.payout_id); let diesel_payout_update = payout_update.to_storage_model(); let origin_diesel_payout = this.clone().to_storage_model(); @@ -194,6 +202,8 @@ impl PayoutsInterface for KVRouterStore { er.change_context(new_err) }) }; + let storage_scheme = + decide_storage_scheme::<_, DieselPayouts>(self, storage_scheme, Op::Find).await; match storage_scheme { MerchantStorageScheme::PostgresOnly => database_call().await, MerchantStorageScheme::RedisKv => { @@ -236,6 +246,8 @@ impl PayoutsInterface for KVRouterStore { er.change_context(new_err) }) }; + let storage_scheme = + decide_storage_scheme::<_, DieselPayouts>(self, storage_scheme, Op::Find).await; match storage_scheme { MerchantStorageScheme::PostgresOnly => { let maybe_payouts = database_call().await?; diff --git a/crates/storage_impl/src/redis/kv_store.rs b/crates/storage_impl/src/redis/kv_store.rs index 7853763f32..9c429058b7 100644 --- a/crates/storage_impl/src/redis/kv_store.rs +++ b/crates/storage_impl/src/redis/kv_store.rs @@ -1,6 +1,7 @@ use std::{fmt::Debug, sync::Arc}; use common_utils::errors::CustomResult; +use diesel_models::enums::MerchantStorageScheme; use error_stack::report; use redis_interface::errors::RedisError; use router_derive::TryGetEnumVariant; @@ -20,6 +21,7 @@ pub trait KvStorePartition { } #[allow(unused)] +#[derive(Clone)] pub enum PartitionKey<'a> { MerchantIdPaymentId { merchant_id: &'a str, @@ -235,3 +237,67 @@ where err }) } + +pub enum Op<'a> { + Insert, + Update(PartitionKey<'a>, &'a str, Option<&'a str>), + Find, +} + +impl<'a> std::fmt::Display for Op<'a> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Op::Insert => f.write_str("insert"), + Op::Find => f.write_str("find"), + Op::Update(p_key, _, _) => f.write_str(&format!("update_{}", p_key)), + } + } +} + +pub async fn decide_storage_scheme<'a, T, D>( + store: &KVRouterStore, + storage_scheme: MerchantStorageScheme, + operation: Op<'a>, +) -> MerchantStorageScheme +where + D: de::DeserializeOwned + + serde::Serialize + + Debug + + KvStorePartition + + UniqueConstraints + + Sync, + T: crate::database::store::DatabaseStore, +{ + if store.soft_kill_mode { + let ops = operation.to_string(); + let updated_scheme = match operation { + Op::Insert => MerchantStorageScheme::PostgresOnly, + Op::Find => MerchantStorageScheme::RedisKv, + Op::Update(partition_key, field, Some("redis_kv")) => { + match kv_wrapper::(store, KvOperation::::HGet(field), partition_key) + .await + { + Ok(_) => { + metrics::KV_SOFT_KILL_ACTIVE_UPDATE.add(&metrics::CONTEXT, 1, &[]); + MerchantStorageScheme::RedisKv + } + Err(_) => MerchantStorageScheme::PostgresOnly, + } + } + + Op::Update(_, _, None) => MerchantStorageScheme::PostgresOnly, + Op::Update(_, _, Some("postgres_only")) => MerchantStorageScheme::PostgresOnly, + _ => { + logger::debug!("soft_kill_mode - using default storage scheme"); + storage_scheme + } + }; + + let type_name = std::any::type_name::(); + logger::info!(soft_kill_mode = "decide_storage_scheme", decided_scheme = %updated_scheme, configured_scheme = %storage_scheme,entity = %type_name, operation = %ops); + + updated_scheme + } else { + storage_scheme + } +} diff --git a/migrations/2024-05-14-092623_add_updated_by_column/down.sql b/migrations/2024-05-14-092623_add_updated_by_column/down.sql new file mode 100644 index 0000000000..436a57dade --- /dev/null +++ b/migrations/2024-05-14-092623_add_updated_by_column/down.sql @@ -0,0 +1,6 @@ +-- This file should undo anything in `up.sql` +ALTER TABLE payment_methods DROP COLUMN IF EXISTS updated_by; + +ALTER TABLE mandate DROP COLUMN IF EXISTS updated_by; + +ALTER TABLE customers DROP COLUMN IF EXISTS updated_by; \ No newline at end of file diff --git a/migrations/2024-05-14-092623_add_updated_by_column/up.sql b/migrations/2024-05-14-092623_add_updated_by_column/up.sql new file mode 100644 index 0000000000..869f52cbfe --- /dev/null +++ b/migrations/2024-05-14-092623_add_updated_by_column/up.sql @@ -0,0 +1,6 @@ +-- Your SQL goes here +ALTER TABLE payment_methods ADD COLUMN IF NOT EXISTS updated_by VARCHAR(64); + +ALTER TABLE mandate ADD COLUMN IF NOT EXISTS updated_by VARCHAR(64); + +ALTER TABLE customers ADD COLUMN IF NOT EXISTS updated_by VARCHAR(64); \ No newline at end of file