diff --git a/crates/diesel_models/src/kv.rs b/crates/diesel_models/src/kv.rs index e70216c6f3..c4d7c993c1 100644 --- a/crates/diesel_models/src/kv.rs +++ b/crates/diesel_models/src/kv.rs @@ -11,7 +11,8 @@ use crate::{ payouts::{Payouts, PayoutsNew, PayoutsUpdate}, refund::{Refund, RefundNew, RefundUpdate}, reverse_lookup::{ReverseLookup, ReverseLookupNew}, - PaymentIntent, PaymentMethod, PaymentMethodNew, PaymentMethodUpdateInternal, PgPooledConn, + Mandate, MandateNew, MandateUpdateInternal, PaymentIntent, PaymentMethod, PaymentMethodNew, + PaymentMethodUpdateInternal, PgPooledConn, }; #[derive(Debug, Serialize, Deserialize)] @@ -40,6 +41,7 @@ impl DBOperation { Insertable::Customer(_) => "customer", Insertable::ReverseLookUp(_) => "reverse_lookup", Insertable::PaymentMethod(_) => "payment_method", + Insertable::Mandate(_) => "mandate", }, Self::Update { updatable } => match updatable { Updateable::PaymentIntentUpdate(_) => "payment_intent", @@ -50,6 +52,7 @@ impl DBOperation { Updateable::PayoutsUpdate(_) => "payouts", Updateable::PayoutAttemptUpdate(_) => "payout_attempt", Updateable::PaymentMethodUpdate(_) => "payment_method", + Updateable::MandateUpdate(_) => " mandate", }, } } @@ -66,6 +69,7 @@ pub enum DBResult { Payouts(Box), PayoutAttempt(Box), PaymentMethod(Box), + Mandate(Box), } #[derive(Debug, Serialize, Deserialize)] @@ -99,6 +103,7 @@ impl DBOperation { Insertable::PaymentMethod(rev) => { DBResult::PaymentMethod(Box::new(rev.insert(conn).await?)) } + Insertable::Mandate(m) => DBResult::Mandate(Box::new(m.insert(conn).await?)), }, Self::Update { updatable } => match updatable { Updateable::PaymentIntentUpdate(a) => { @@ -124,6 +129,15 @@ impl DBOperation { .update_with_payment_method_id(conn, v.update_data) .await?, )), + Updateable::MandateUpdate(m) => DBResult::Mandate(Box::new( + Mandate::update_by_merchant_id_mandate_id( + conn, + &m.orig.merchant_id, + &m.orig.mandate_id, + m.update_data, + ) + .await?, + )), Updateable::CustomerUpdate(cust) => DBResult::Customer(Box::new( Customer::update_by_customer_id_merchant_id( conn, @@ -171,6 +185,7 @@ pub enum Insertable { Payouts(PayoutsNew), PayoutAttempt(PayoutAttemptNew), PaymentMethod(PaymentMethodNew), + Mandate(MandateNew), } #[derive(Debug, Serialize, Deserialize)] @@ -184,6 +199,7 @@ pub enum Updateable { PayoutsUpdate(PayoutsUpdateMems), PayoutAttemptUpdate(PayoutAttemptUpdateMems), PaymentMethodUpdate(PaymentMethodUpdateMems), + MandateUpdate(MandateUpdateMems), } #[derive(Debug, Serialize, Deserialize)] @@ -233,3 +249,9 @@ pub struct PaymentMethodUpdateMems { pub orig: PaymentMethod, pub update_data: PaymentMethodUpdateInternal, } + +#[derive(Debug, Serialize, Deserialize)] +pub struct MandateUpdateMems { + pub orig: Mandate, + pub update_data: MandateUpdateInternal, +} diff --git a/crates/diesel_models/src/mandate.rs b/crates/diesel_models/src/mandate.rs index 31c6ef62f2..39c43a4a17 100644 --- a/crates/diesel_models/src/mandate.rs +++ b/crates/diesel_models/src/mandate.rs @@ -5,7 +5,7 @@ use time::PrimitiveDateTime; use crate::{enums as storage_enums, schema::mandate}; -#[derive(Clone, Debug, Identifiable, Queryable)] +#[derive(Clone, Debug, Identifiable, Queryable, serde::Serialize, serde::Deserialize)] #[diesel(table_name = mandate)] pub struct Mandate { pub id: i32, @@ -35,7 +35,14 @@ pub struct Mandate { } #[derive( - router_derive::Setter, Clone, Debug, Default, Insertable, router_derive::DebugAsDisplay, + router_derive::Setter, + Clone, + Debug, + Default, + Insertable, + router_derive::DebugAsDisplay, + serde::Serialize, + serde::Deserialize, )] #[diesel(table_name = mandate)] pub struct MandateNew { @@ -89,7 +96,15 @@ pub struct SingleUseMandate { pub currency: storage_enums::Currency, } -#[derive(Clone, Debug, Default, AsChangeset, router_derive::DebugAsDisplay)] +#[derive( + Clone, + Debug, + Default, + AsChangeset, + router_derive::DebugAsDisplay, + serde::Serialize, + serde::Deserialize, +)] #[diesel(table_name = mandate)] pub struct MandateUpdateInternal { mandate_status: Option, @@ -140,3 +155,59 @@ impl From for MandateUpdateInternal { } } } + +impl MandateUpdateInternal { + pub fn apply_changeset(self, source: Mandate) -> Mandate { + let Self { + mandate_status, + amount_captured, + connector_mandate_ids, + connector_mandate_id, + payment_method_id, + original_payment_id, + } = self; + + Mandate { + mandate_status: mandate_status.unwrap_or(source.mandate_status), + amount_captured: amount_captured.map_or(source.amount_captured, Some), + connector_mandate_ids: connector_mandate_ids.map_or(source.connector_mandate_ids, Some), + connector_mandate_id: connector_mandate_id.map_or(source.connector_mandate_id, Some), + payment_method_id: payment_method_id.unwrap_or(source.payment_method_id), + original_payment_id: original_payment_id.map_or(source.original_payment_id, Some), + ..source + } + } +} + +impl From<&MandateNew> for Mandate { + fn from(mandate_new: &MandateNew) -> Self { + Self { + id: 0i32, + mandate_id: mandate_new.mandate_id.clone(), + customer_id: mandate_new.customer_id.clone(), + merchant_id: mandate_new.merchant_id.clone(), + payment_method_id: mandate_new.payment_method_id.clone(), + mandate_status: mandate_new.mandate_status, + mandate_type: mandate_new.mandate_type, + customer_accepted_at: mandate_new.customer_accepted_at, + customer_ip_address: mandate_new.customer_ip_address.clone(), + customer_user_agent: mandate_new.customer_user_agent.clone(), + network_transaction_id: mandate_new.network_transaction_id.clone(), + previous_attempt_id: mandate_new.previous_attempt_id.clone(), + created_at: mandate_new + .created_at + .unwrap_or_else(common_utils::date_time::now), + mandate_amount: mandate_new.mandate_amount, + mandate_currency: mandate_new.mandate_currency, + amount_captured: mandate_new.amount_captured, + connector: mandate_new.connector.clone(), + connector_mandate_id: mandate_new.connector_mandate_id.clone(), + start_date: mandate_new.start_date, + end_date: mandate_new.end_date, + metadata: mandate_new.metadata.clone(), + connector_mandate_ids: mandate_new.connector_mandate_ids.clone(), + original_payment_id: mandate_new.original_payment_id.clone(), + merchant_connector_id: mandate_new.merchant_connector_id.clone(), + } + } +} diff --git a/crates/diesel_models/src/query/mandate.rs b/crates/diesel_models/src/query/mandate.rs index 0baae87c51..6925cd12b6 100644 --- a/crates/diesel_models/src/query/mandate.rs +++ b/crates/diesel_models/src/query/mandate.rs @@ -65,14 +65,14 @@ impl Mandate { conn: &PgPooledConn, merchant_id: &str, mandate_id: &str, - mandate: MandateUpdate, + mandate: MandateUpdateInternal, ) -> StorageResult { generics::generic_update_with_results::<::Table, _, _, _>( conn, dsl::merchant_id .eq(merchant_id.to_owned()) .and(dsl::mandate_id.eq(mandate_id.to_owned())), - MandateUpdateInternal::from(mandate), + mandate, ) .await? .first() diff --git a/crates/router/src/core/mandate.rs b/crates/router/src/core/mandate.rs index 54fec6ab4a..b54091e492 100644 --- a/crates/router/src/core/mandate.rs +++ b/crates/router/src/core/mandate.rs @@ -23,7 +23,8 @@ use crate::{ mandates::{self, MandateResponseExt}, ConnectorData, GetToken, }, - domain, storage, + domain, + storage::{self, enums::MerchantStorageScheme}, transformers::ForeignFrom, }, utils::OptionExt, @@ -39,7 +40,11 @@ pub async fn get_mandate( let mandate = state .store .as_ref() - .find_mandate_by_merchant_id_mandate_id(&merchant_account.merchant_id, &req.mandate_id) + .find_mandate_by_merchant_id_mandate_id( + &merchant_account.merchant_id, + &req.mandate_id, + merchant_account.storage_scheme, + ) .await .to_not_found_response(errors::ApiErrorResponse::MandateNotFound)?; Ok(services::ApplicationResponse::Json( @@ -62,7 +67,11 @@ pub async fn revoke_mandate( ) -> RouterResponse { let db = state.store.as_ref(); let mandate = db - .find_mandate_by_merchant_id_mandate_id(&merchant_account.merchant_id, &req.mandate_id) + .find_mandate_by_merchant_id_mandate_id( + &merchant_account.merchant_id, + &req.mandate_id, + merchant_account.storage_scheme, + ) .await .to_not_found_response(errors::ApiErrorResponse::MandateNotFound)?; match mandate.mandate_status { @@ -123,6 +132,8 @@ pub async fn revoke_mandate( storage::MandateUpdate::StatusUpdate { mandate_status: storage::enums::MandateStatus::Revoked, }, + mandate, + merchant_account.storage_scheme, ) .await .to_not_found_response(errors::ApiErrorResponse::MandateNotFound)?; @@ -162,6 +173,7 @@ pub async fn update_connector_mandate_id( mandate_ids_opt: Option, payment_method_id: Option, resp: Result, + storage_scheme: MerchantStorageScheme, ) -> RouterResponse { let mandate_details = Option::foreign_from(resp); let connector_mandate_id = mandate_details @@ -176,7 +188,7 @@ pub async fn update_connector_mandate_id( //Ignore updation if the payment_attempt mandate_id or connector_mandate_id is not present if let Some((mandate_id, connector_id)) = mandate_ids_opt.zip(connector_mandate_id) { let mandate = db - .find_mandate_by_merchant_id_mandate_id(&merchant_account, &mandate_id) + .find_mandate_by_merchant_id_mandate_id(&merchant_account, &mandate_id, storage_scheme) .await .change_context(errors::ApiErrorResponse::MandateNotFound)?; @@ -199,6 +211,8 @@ pub async fn update_connector_mandate_id( &merchant_account, &mandate_id, update_mandate_details, + mandate, + storage_scheme, ) .await .change_context(errors::ApiErrorResponse::MandateUpdateFailed)?; @@ -262,6 +276,7 @@ pub async fn update_mandate_procedure( mandate: Mandate, merchant_id: &str, pm_id: Option, + storage_scheme: MerchantStorageScheme, ) -> errors::RouterResult> where FData: MandateBehaviour, @@ -276,13 +291,14 @@ where }; let old_record = payments::UpdateHistory { - connector_mandate_id: mandate.connector_mandate_id, - payment_method_id: mandate.payment_method_id, - original_payment_id: mandate.original_payment_id, + connector_mandate_id: mandate.connector_mandate_id.clone(), + payment_method_id: mandate.payment_method_id.clone(), + original_payment_id: mandate.original_payment_id.clone(), }; let mandate_ref = mandate .connector_mandate_ids + .clone() .parse_value::("Connector Reference Id") .change_context(errors::ApiErrorResponse::MandateDeserializationFailed)?; @@ -302,11 +318,12 @@ where .change_context(errors::ApiErrorResponse::InternalServerError) .map(masking::Secret::new)?; + let mandate_id = mandate.mandate_id.clone(); let _update_mandate_details = state .store .update_mandate_by_merchant_id_mandate_id( merchant_id, - &mandate.mandate_id, + &mandate_id, diesel_models::MandateUpdate::ConnectorMandateIdUpdate { connector_mandate_id: mandate_details .as_ref() @@ -316,6 +333,8 @@ where .unwrap_or("Error retrieving the payment_method_id".to_string()), original_payment_id: Some(resp.payment_id.clone()), }, + mandate, + storage_scheme, ) .await .change_context(errors::ApiErrorResponse::MandateUpdateFailed)?; @@ -327,6 +346,7 @@ pub async fn mandate_procedure( maybe_customer: &Option, pm_id: Option, merchant_connector_id: Option, + storage_scheme: MerchantStorageScheme, ) -> errors::RouterResult> where FData: MandateBehaviour, @@ -336,15 +356,16 @@ where Ok(_) => match resp.request.get_mandate_id() { Some(mandate_id) => { if let Some(ref mandate_id) = mandate_id.mandate_id { - let mandate = state + let orig_mandate = state .store .find_mandate_by_merchant_id_mandate_id( resp.merchant_id.as_ref(), mandate_id, + storage_scheme, ) .await .to_not_found_response(errors::ApiErrorResponse::MandateNotFound)?; - let mandate = match mandate.mandate_type { + let mandate = match orig_mandate.mandate_type { storage_enums::MandateType::SingleUse => state .store .update_mandate_by_merchant_id_mandate_id( @@ -353,6 +374,8 @@ where storage::MandateUpdate::StatusUpdate { mandate_status: storage_enums::MandateStatus::Revoked, }, + orig_mandate, + storage_scheme, ) .await .change_context(errors::ApiErrorResponse::MandateUpdateFailed), @@ -363,10 +386,12 @@ where mandate_id, storage::MandateUpdate::CaptureAmountUpdate { amount_captured: Some( - mandate.amount_captured.unwrap_or(0) + orig_mandate.amount_captured.unwrap_or(0) + resp.request.get_amount(), ), }, + orig_mandate, + storage_scheme, ) .await .change_context(errors::ApiErrorResponse::MandateUpdateFailed), @@ -451,7 +476,7 @@ where })); state .store - .insert_mandate(new_mandate_data) + .insert_mandate(new_mandate_data, storage_scheme) .await .to_duplicate_response(errors::ApiErrorResponse::DuplicateMandate)?; metrics::MANDATE_COUNT.add( diff --git a/crates/router/src/core/payments/flows/authorize_flow.rs b/crates/router/src/core/payments/flows/authorize_flow.rs index cccfab5a74..10a4206a2b 100644 --- a/crates/router/src/core/payments/flows/authorize_flow.rs +++ b/crates/router/src/core/payments/flows/authorize_flow.rs @@ -117,6 +117,7 @@ impl Feature for types::PaymentsAu maybe_customer, payment_method_id, connector.merchant_connector_id.clone(), + merchant_account.storage_scheme, ) .await?) } else { diff --git a/crates/router/src/core/payments/flows/setup_mandate_flow.rs b/crates/router/src/core/payments/flows/setup_mandate_flow.rs index d8acfe98f3..ff77efa7c4 100644 --- a/crates/router/src/core/payments/flows/setup_mandate_flow.rs +++ b/crates/router/src/core/payments/flows/setup_mandate_flow.rs @@ -123,6 +123,7 @@ impl Feature for types::Setup maybe_customer, pm_id, connector.merchant_connector_id.clone(), + merchant_account.storage_scheme, ) .await } @@ -265,6 +266,7 @@ impl types::SetupMandateRouterData { maybe_customer, pm_id, connector.merchant_connector_id.clone(), + merchant_account.storage_scheme, ) .await?) } @@ -348,7 +350,11 @@ impl types::SetupMandateRouterData { .0; let mandate = state .store - .find_mandate_by_merchant_id_mandate_id(&merchant_account.merchant_id, &mandate_id) + .find_mandate_by_merchant_id_mandate_id( + &merchant_account.merchant_id, + &mandate_id, + merchant_account.storage_scheme, + ) .await .to_not_found_response(errors::ApiErrorResponse::MandateNotFound)?; @@ -400,6 +406,7 @@ impl types::SetupMandateRouterData { mandate, &merchant_account.merchant_id, pm_id, + merchant_account.storage_scheme, ) .await } diff --git a/crates/router/src/core/payments/helpers.rs b/crates/router/src/core/payments/helpers.rs index 9851ec1edc..bab8cf9f34 100644 --- a/crates/router/src/core/payments/helpers.rs +++ b/crates/router/src/core/payments/helpers.rs @@ -560,7 +560,11 @@ pub async fn get_token_for_recurring_mandate( let db = &*state.store; let mandate = db - .find_mandate_by_merchant_id_mandate_id(&merchant_account.merchant_id, mandate_id.as_str()) + .find_mandate_by_merchant_id_mandate_id( + &merchant_account.merchant_id, + mandate_id.as_str(), + merchant_account.storage_scheme, + ) .await .to_not_found_response(errors::ApiErrorResponse::MandateNotFound)?; diff --git a/crates/router/src/core/payments/operations/payment_create.rs b/crates/router/src/core/payments/operations/payment_create.rs index 5c5eeb046f..99a387fc8d 100644 --- a/crates/router/src/core/payments/operations/payment_create.rs +++ b/crates/router/src/core/payments/operations/payment_create.rs @@ -320,7 +320,7 @@ impl }) .async_and_then(|mandate_id| async { let mandate = db - .find_mandate_by_merchant_id_mandate_id(merchant_id, mandate_id) + .find_mandate_by_merchant_id_mandate_id(merchant_id, mandate_id, storage_scheme) .await .to_not_found_response(errors::ApiErrorResponse::MandateNotFound); Some(mandate.and_then(|mandate_obj| { diff --git a/crates/router/src/core/payments/operations/payment_response.rs b/crates/router/src/core/payments/operations/payment_response.rs index 50c795be2b..9bb8119d40 100644 --- a/crates/router/src/core/payments/operations/payment_response.rs +++ b/crates/router/src/core/payments/operations/payment_response.rs @@ -939,6 +939,7 @@ async fn payment_response_update_tracker( m_payment_data_mandate_id, m_payment_method_id, m_router_data_response, + storage_scheme, ) .await } diff --git a/crates/router/src/core/payments/operations/payment_update.rs b/crates/router/src/core/payments/operations/payment_update.rs index e722a05f76..bfd056e8f9 100644 --- a/crates/router/src/core/payments/operations/payment_update.rs +++ b/crates/router/src/core/payments/operations/payment_update.rs @@ -290,7 +290,7 @@ impl }) .async_and_then(|mandate_id| async { let mandate = db - .find_mandate_by_merchant_id_mandate_id(merchant_id, mandate_id) + .find_mandate_by_merchant_id_mandate_id(merchant_id, mandate_id, merchant_account.storage_scheme) .await .change_context(errors::ApiErrorResponse::MandateNotFound); Some(mandate.and_then(|mandate_obj| { diff --git a/crates/router/src/core/webhooks.rs b/crates/router/src/core/webhooks.rs index 278ac8f970..bb1efc9625 100644 --- a/crates/router/src/core/webhooks.rs +++ b/crates/router/src/core/webhooks.rs @@ -434,6 +434,7 @@ pub async fn mandates_incoming_webhook_flow( .find_mandate_by_merchant_id_mandate_id( &merchant_account.merchant_id, mandate_id.as_str(), + merchant_account.storage_scheme, ) .await .to_not_found_response(errors::ApiErrorResponse::MandateNotFound)?, @@ -443,6 +444,7 @@ pub async fn mandates_incoming_webhook_flow( .find_mandate_by_merchant_id_connector_mandate_id( &merchant_account.merchant_id, connector_mandate_id.as_str(), + merchant_account.storage_scheme, ) .await .to_not_found_response(errors::ApiErrorResponse::MandateNotFound)?, @@ -452,11 +454,14 @@ pub async fn mandates_incoming_webhook_flow( let mandate_status = common_enums::MandateStatus::foreign_try_from(event_type) .change_context(errors::ApiErrorResponse::WebhookProcessingFailure) .attach_printable("event type to mandate status mapping failed")?; + let mandate_id = mandate.mandate_id.clone(); let updated_mandate = db .update_mandate_by_merchant_id_mandate_id( &merchant_account.merchant_id, - &mandate.mandate_id, + &mandate_id, storage::MandateUpdate::StatusUpdate { mandate_status }, + mandate, + merchant_account.storage_scheme, ) .await .to_not_found_response(errors::ApiErrorResponse::MandateNotFound)?; diff --git a/crates/router/src/db/kafka_store.rs b/crates/router/src/db/kafka_store.rs index 095c26e05a..c32bcf6f6a 100644 --- a/crates/router/src/db/kafka_store.rs +++ b/crates/router/src/db/kafka_store.rs @@ -668,9 +668,10 @@ impl MandateInterface for KafkaStore { &self, merchant_id: &str, mandate_id: &str, + storage_scheme: MerchantStorageScheme, ) -> CustomResult { self.diesel_store - .find_mandate_by_merchant_id_mandate_id(merchant_id, mandate_id) + .find_mandate_by_merchant_id_mandate_id(merchant_id, mandate_id, storage_scheme) .await } @@ -678,9 +679,14 @@ impl MandateInterface for KafkaStore { &self, merchant_id: &str, connector_mandate_id: &str, + storage_scheme: MerchantStorageScheme, ) -> CustomResult { self.diesel_store - .find_mandate_by_merchant_id_connector_mandate_id(merchant_id, connector_mandate_id) + .find_mandate_by_merchant_id_connector_mandate_id( + merchant_id, + connector_mandate_id, + storage_scheme, + ) .await } @@ -698,10 +704,18 @@ impl MandateInterface for KafkaStore { &self, merchant_id: &str, mandate_id: &str, - mandate: storage::MandateUpdate, + mandate_update: storage::MandateUpdate, + mandate: storage::Mandate, + storage_scheme: MerchantStorageScheme, ) -> CustomResult { self.diesel_store - .update_mandate_by_merchant_id_mandate_id(merchant_id, mandate_id, mandate) + .update_mandate_by_merchant_id_mandate_id( + merchant_id, + mandate_id, + mandate_update, + mandate, + storage_scheme, + ) .await } @@ -718,8 +732,11 @@ impl MandateInterface for KafkaStore { async fn insert_mandate( &self, mandate: storage::MandateNew, + storage_scheme: MerchantStorageScheme, ) -> CustomResult { - self.diesel_store.insert_mandate(mandate).await + self.diesel_store + .insert_mandate(mandate, storage_scheme) + .await } } diff --git a/crates/router/src/db/mandate.rs b/crates/router/src/db/mandate.rs index 273d34a68a..1751657e3f 100644 --- a/crates/router/src/db/mandate.rs +++ b/crates/router/src/db/mandate.rs @@ -1,11 +1,9 @@ -use error_stack::{report, ResultExt}; -use router_env::{instrument, tracing}; +use error_stack::ResultExt; -use super::{MockDb, Store}; +use super::MockDb; use crate::{ - connection, - core::{errors, errors::CustomResult}, - types::storage::{self, MandateDbExt}, + core::errors::{self, CustomResult}, + types::storage::{self as storage_types, enums::MerchantStorageScheme}, }; #[async_trait::async_trait] @@ -14,116 +12,439 @@ pub trait MandateInterface { &self, merchant_id: &str, mandate_id: &str, - ) -> CustomResult; + storage_scheme: MerchantStorageScheme, + ) -> CustomResult; async fn find_mandate_by_merchant_id_connector_mandate_id( &self, merchant_id: &str, connector_mandate_id: &str, - ) -> CustomResult; + storage_scheme: MerchantStorageScheme, + ) -> CustomResult; async fn find_mandate_by_merchant_id_customer_id( &self, merchant_id: &str, customer_id: &str, - ) -> CustomResult, errors::StorageError>; + ) -> CustomResult, errors::StorageError>; async fn update_mandate_by_merchant_id_mandate_id( &self, merchant_id: &str, mandate_id: &str, - mandate: storage::MandateUpdate, - ) -> CustomResult; + mandate_update: storage_types::MandateUpdate, + mandate: storage_types::Mandate, + storage_scheme: MerchantStorageScheme, + ) -> CustomResult; async fn find_mandates_by_merchant_id( &self, merchant_id: &str, mandate_constraints: api_models::mandates::MandateListConstraints, - ) -> CustomResult, errors::StorageError>; + ) -> CustomResult, errors::StorageError>; async fn insert_mandate( &self, - mandate: storage::MandateNew, - ) -> CustomResult; + mandate: storage_types::MandateNew, + storage_scheme: MerchantStorageScheme, + ) -> CustomResult; } -#[async_trait::async_trait] -impl MandateInterface for Store { - #[instrument(skip_all)] - async fn find_mandate_by_merchant_id_mandate_id( - &self, - merchant_id: &str, - mandate_id: &str, - ) -> CustomResult { - let conn = connection::pg_connection_read(self).await?; - storage::Mandate::find_by_merchant_id_mandate_id(&conn, merchant_id, mandate_id) +#[cfg(feature = "kv_store")] +mod storage { + use common_utils::fallback_reverse_lookup_not_found; + use diesel_models::kv; + use error_stack::{report, ResultExt}; + use redis_interface::HsetnxReply; + use router_env::{instrument, tracing}; + use storage_impl::redis::kv_store::{kv_wrapper, KvOperation, PartitionKey}; + + use super::MandateInterface; + use crate::{ + connection, + core::errors::{self, utils::RedisErrorExt, CustomResult}, + db::reverse_lookup::ReverseLookupInterface, + services::Store, + types::storage::{self as storage_types, enums::MerchantStorageScheme, MandateDbExt}, + utils::db_utils, + }; + + #[async_trait::async_trait] + impl MandateInterface for Store { + #[instrument(skip_all)] + async fn find_mandate_by_merchant_id_mandate_id( + &self, + merchant_id: &str, + mandate_id: &str, + storage_scheme: MerchantStorageScheme, + ) -> CustomResult { + let conn = connection::pg_connection_read(self).await?; + let database_call = || async { + storage_types::Mandate::find_by_merchant_id_mandate_id( + &conn, + merchant_id, + mandate_id, + ) + .await + .map_err(|error| report!(errors::StorageError::from(error))) + }; + + match storage_scheme { + MerchantStorageScheme::PostgresOnly => database_call().await, + MerchantStorageScheme::RedisKv => { + let key = PartitionKey::MerchantIdMandateId { + merchant_id, + mandate_id, + }; + let field = format!("mandate_{}", mandate_id); + + Box::pin(db_utils::try_redis_get_else_try_database_get( + async { + kv_wrapper( + self, + KvOperation::::HGet(&field), + key, + ) + .await? + .try_into_hget() + }, + database_call, + )) + .await + } + } + } + + #[instrument(skip_all)] + async fn find_mandate_by_merchant_id_connector_mandate_id( + &self, + merchant_id: &str, + connector_mandate_id: &str, + storage_scheme: MerchantStorageScheme, + ) -> CustomResult { + let conn = connection::pg_connection_read(self).await?; + let database_call = || async { + storage_types::Mandate::find_by_merchant_id_connector_mandate_id( + &conn, + merchant_id, + connector_mandate_id, + ) + .await + .map_err(|error| report!(errors::StorageError::from(error))) + }; + + match storage_scheme { + MerchantStorageScheme::PostgresOnly => database_call().await, + MerchantStorageScheme::RedisKv => { + let lookup_id = + format!("mid_{}_conn_mandate_{}", merchant_id, connector_mandate_id); + let lookup = fallback_reverse_lookup_not_found!( + self.get_lookup_by_lookup_id(&lookup_id, storage_scheme) + .await, + database_call().await + ); + + let key = PartitionKey::CombinationKey { + combination: &lookup.pk_id, + }; + + Box::pin(db_utils::try_redis_get_else_try_database_get( + async { + kv_wrapper( + self, + KvOperation::::HGet(&lookup.sk_id), + key, + ) + .await? + .try_into_hget() + }, + database_call, + )) + .await + } + } + } + + #[instrument(skip_all)] + async fn find_mandate_by_merchant_id_customer_id( + &self, + merchant_id: &str, + customer_id: &str, + ) -> CustomResult, errors::StorageError> { + let conn = connection::pg_connection_read(self).await?; + storage_types::Mandate::find_by_merchant_id_customer_id(&conn, merchant_id, customer_id) + .await + .map_err(|error| report!(errors::StorageError::from(error))) + } + + #[instrument(skip_all)] + async fn update_mandate_by_merchant_id_mandate_id( + &self, + merchant_id: &str, + mandate_id: &str, + mandate_update: storage_types::MandateUpdate, + mandate: storage_types::Mandate, + storage_scheme: MerchantStorageScheme, + ) -> CustomResult { + let conn = connection::pg_connection_write(self).await?; + + match storage_scheme { + MerchantStorageScheme::PostgresOnly => { + storage_types::Mandate::update_by_merchant_id_mandate_id( + &conn, + merchant_id, + mandate_id, + storage_types::MandateUpdateInternal::from(mandate_update), + ) + .await + .map_err(|error| report!(errors::StorageError::from(error))) + } + MerchantStorageScheme::RedisKv => { + let key = PartitionKey::MerchantIdMandateId { + merchant_id, + mandate_id, + }; + let field = format!("mandate_{}", mandate_id); + let key_str = key.to_string(); + + if let diesel_models::MandateUpdate::ConnectorMandateIdUpdate { + connector_mandate_id: Some(val), + .. + } = &mandate_update + { + let rev_lookup = diesel_models::ReverseLookupNew { + sk_id: field.clone(), + pk_id: key_str.clone(), + lookup_id: format!("mid_{}_conn_mandate_{}", merchant_id, val), + source: "mandate".to_string(), + updated_by: storage_scheme.to_string(), + }; + self.insert_reverse_lookup(rev_lookup, storage_scheme) + .await?; + } + + let m_update = diesel_models::MandateUpdateInternal::from(mandate_update); + let updated_mandate = m_update.clone().apply_changeset(mandate.clone()); + + let redis_value = serde_json::to_string(&updated_mandate) + .change_context(errors::StorageError::SerializationFailed)?; + + let redis_entry = kv::TypedSql { + op: kv::DBOperation::Update { + updatable: kv::Updateable::MandateUpdate(kv::MandateUpdateMems { + orig: mandate, + update_data: m_update, + }), + }, + }; + + kv_wrapper::<(), _, _>( + self, + KvOperation::::Hset( + (&field, redis_value), + redis_entry, + ), + key, + ) + .await + .map_err(|err| err.to_redis_failed_response(&key_str))? + .try_into_hset() + .change_context(errors::StorageError::KVError)?; + + Ok(updated_mandate) + } + } + } + + #[instrument(skip_all)] + async fn find_mandates_by_merchant_id( + &self, + merchant_id: &str, + mandate_constraints: api_models::mandates::MandateListConstraints, + ) -> CustomResult, errors::StorageError> { + let conn = connection::pg_connection_read(self).await?; + storage_types::Mandate::filter_by_constraints(&conn, merchant_id, mandate_constraints) + .await + .map_err(|error| report!(errors::StorageError::from(error))) + } + + #[instrument(skip_all)] + async fn insert_mandate( + &self, + mandate: storage_types::MandateNew, + storage_scheme: MerchantStorageScheme, + ) -> CustomResult { + let conn = connection::pg_connection_write(self).await?; + + match storage_scheme { + MerchantStorageScheme::PostgresOnly => mandate + .insert(&conn) + .await + .map_err(|error| report!(errors::StorageError::from(error))), + MerchantStorageScheme::RedisKv => { + let mandate_id = mandate.mandate_id.clone(); + let merchant_id = mandate.merchant_id.clone(); + let connector_mandate_id = mandate.connector_mandate_id.clone(); + + let key = PartitionKey::MerchantIdMandateId { + merchant_id: merchant_id.as_str(), + mandate_id: mandate_id.as_str(), + }; + let key_str = key.to_string(); + let field = format!("mandate_{}", mandate_id); + + let storage_mandate = storage_types::Mandate::from(&mandate); + + let redis_entry = kv::TypedSql { + op: kv::DBOperation::Insert { + insertable: kv::Insertable::Mandate(mandate), + }, + }; + + if let Some(connector_val) = connector_mandate_id { + let lookup_id = + format!("mid_{}_conn_mandate_{}", merchant_id, connector_val); + + let reverse_lookup_entry = diesel_models::ReverseLookupNew { + sk_id: field.clone(), + pk_id: key_str.clone(), + lookup_id, + source: "mandate".to_string(), + updated_by: storage_scheme.to_string(), + }; + + self.insert_reverse_lookup(reverse_lookup_entry, storage_scheme) + .await?; + } + + match kv_wrapper::( + self, + KvOperation::::HSetNx( + &field, + &storage_mandate, + redis_entry, + ), + key, + ) + .await + .map_err(|err| err.to_redis_failed_response(&key_str))? + .try_into_hsetnx() + { + Ok(HsetnxReply::KeyNotSet) => Err(errors::StorageError::DuplicateValue { + entity: "mandate", + key: Some(storage_mandate.mandate_id), + } + .into()), + Ok(HsetnxReply::KeySet) => Ok(storage_mandate), + Err(er) => Err(er).change_context(errors::StorageError::KVError), + } + } + } + } + } +} + +#[cfg(not(feature = "kv_store"))] +mod storage { + use error_stack::report; + use router_env::{instrument, tracing}; + + use super::MandateInterface; + use crate::{ + connection, + core::errors::{self, CustomResult}, + services::Store, + types::storage::{self as storage_types, enums::MerchantStorageScheme, MandateDbExt}, + }; + + #[async_trait::async_trait] + impl MandateInterface for Store { + #[instrument(skip_all)] + async fn find_mandate_by_merchant_id_mandate_id( + &self, + merchant_id: &str, + mandate_id: &str, + _storage_scheme: MerchantStorageScheme, + ) -> CustomResult { + let conn = connection::pg_connection_read(self).await?; + storage_types::Mandate::find_by_merchant_id_mandate_id(&conn, merchant_id, mandate_id) + .await + .map_err(|error| report!(errors::StorageError::from(error))) + } + + #[instrument(skip_all)] + async fn find_mandate_by_merchant_id_connector_mandate_id( + &self, + merchant_id: &str, + connector_mandate_id: &str, + _storage_scheme: MerchantStorageScheme, + ) -> CustomResult { + let conn = connection::pg_connection_read(self).await?; + storage_types::Mandate::find_by_merchant_id_connector_mandate_id( + &conn, + merchant_id, + connector_mandate_id, + ) .await .map_err(|error| report!(errors::StorageError::from(error))) - } + } - #[instrument(skip_all)] - async fn find_mandate_by_merchant_id_connector_mandate_id( - &self, - merchant_id: &str, - connector_mandate_id: &str, - ) -> CustomResult { - let conn = connection::pg_connection_read(self).await?; - storage::Mandate::find_by_merchant_id_connector_mandate_id( - &conn, - merchant_id, - connector_mandate_id, - ) - .await - .map_err(|error| report!(errors::StorageError::from(error))) - } + #[instrument(skip_all)] + async fn find_mandate_by_merchant_id_customer_id( + &self, + merchant_id: &str, + customer_id: &str, + ) -> CustomResult, errors::StorageError> { + let conn = connection::pg_connection_read(self).await?; + storage_types::Mandate::find_by_merchant_id_customer_id(&conn, merchant_id, customer_id) + .await + .map_err(|error| report!(errors::StorageError::from(error))) + } - #[instrument(skip_all)] - async fn find_mandate_by_merchant_id_customer_id( - &self, - merchant_id: &str, - customer_id: &str, - ) -> CustomResult, errors::StorageError> { - let conn = connection::pg_connection_read(self).await?; - storage::Mandate::find_by_merchant_id_customer_id(&conn, merchant_id, customer_id) + #[instrument(skip_all)] + async fn update_mandate_by_merchant_id_mandate_id( + &self, + merchant_id: &str, + mandate_id: &str, + mandate_update: storage_types::MandateUpdate, + _mandate: storage_types::Mandate, + _storage_scheme: MerchantStorageScheme, + ) -> CustomResult { + let conn = connection::pg_connection_write(self).await?; + storage_types::Mandate::update_by_merchant_id_mandate_id( + &conn, + merchant_id, + mandate_id, + storage_types::MandateUpdateInternal::from(mandate_update), + ) .await .map_err(|error| report!(errors::StorageError::from(error))) - } + } - #[instrument(skip_all)] - async fn update_mandate_by_merchant_id_mandate_id( - &self, - merchant_id: &str, - mandate_id: &str, - mandate: storage::MandateUpdate, - ) -> CustomResult { - let conn = connection::pg_connection_write(self).await?; - storage::Mandate::update_by_merchant_id_mandate_id(&conn, merchant_id, mandate_id, mandate) - .await - .map_err(|error| report!(errors::StorageError::from(error))) - } + #[instrument(skip_all)] + async fn find_mandates_by_merchant_id( + &self, + merchant_id: &str, + mandate_constraints: api_models::mandates::MandateListConstraints, + ) -> CustomResult, errors::StorageError> { + let conn = connection::pg_connection_read(self).await?; + storage_types::Mandate::filter_by_constraints(&conn, merchant_id, mandate_constraints) + .await + .map_err(|error| report!(errors::StorageError::from(error))) + } - #[instrument(skip_all)] - async fn find_mandates_by_merchant_id( - &self, - merchant_id: &str, - mandate_constraints: api_models::mandates::MandateListConstraints, - ) -> CustomResult, errors::StorageError> { - let conn = connection::pg_connection_read(self).await?; - storage::Mandate::filter_by_constraints(&conn, merchant_id, mandate_constraints) - .await - .map_err(|error| report!(errors::StorageError::from(error))) - } - - #[instrument(skip_all)] - async fn insert_mandate( - &self, - mandate: storage::MandateNew, - ) -> CustomResult { - let conn = connection::pg_connection_write(self).await?; - mandate - .insert(&conn) - .await - .map_err(|error| report!(errors::StorageError::from(error))) + #[instrument(skip_all)] + async fn insert_mandate( + &self, + mandate: storage_types::MandateNew, + _storage_scheme: MerchantStorageScheme, + ) -> CustomResult { + let conn = connection::pg_connection_write(self).await?; + mandate + .insert(&conn) + .await + .map_err(|error| report!(errors::StorageError::from(error))) + } } } @@ -133,7 +454,8 @@ impl MandateInterface for MockDb { &self, merchant_id: &str, mandate_id: &str, - ) -> CustomResult { + _storage_scheme: MerchantStorageScheme, + ) -> CustomResult { self.mandates .lock() .await @@ -148,7 +470,8 @@ impl MandateInterface for MockDb { &self, merchant_id: &str, connector_mandate_id: &str, - ) -> CustomResult { + _storage_scheme: MerchantStorageScheme, + ) -> CustomResult { self.mandates .lock() .await @@ -166,7 +489,7 @@ impl MandateInterface for MockDb { &self, merchant_id: &str, customer_id: &str, - ) -> CustomResult, errors::StorageError> { + ) -> CustomResult, errors::StorageError> { return Ok(self .mandates .lock() @@ -183,8 +506,10 @@ impl MandateInterface for MockDb { &self, merchant_id: &str, mandate_id: &str, - mandate_update: storage::MandateUpdate, - ) -> CustomResult { + mandate_update: storage_types::MandateUpdate, + _mandate: storage_types::Mandate, + _storage_scheme: MerchantStorageScheme, + ) -> CustomResult { let mut mandates = self.mandates.lock().await; match mandates .iter_mut() @@ -192,13 +517,13 @@ impl MandateInterface for MockDb { { Some(mandate) => { match mandate_update { - storage::MandateUpdate::StatusUpdate { mandate_status } => { + storage_types::MandateUpdate::StatusUpdate { mandate_status } => { mandate.mandate_status = mandate_status; } - storage::MandateUpdate::CaptureAmountUpdate { amount_captured } => { + storage_types::MandateUpdate::CaptureAmountUpdate { amount_captured } => { mandate.amount_captured = amount_captured; } - storage::MandateUpdate::ConnectorReferenceUpdate { + storage_types::MandateUpdate::ConnectorReferenceUpdate { connector_mandate_ids, } => { mandate.connector_mandate_ids = connector_mandate_ids; @@ -228,7 +553,7 @@ impl MandateInterface for MockDb { &self, merchant_id: &str, mandate_constraints: api_models::mandates::MandateListConstraints, - ) -> CustomResult, errors::StorageError> { + ) -> CustomResult, errors::StorageError> { let mandates = self.mandates.lock().await; let mandates_iter = mandates.iter().filter(|mandate| { let mut checker = mandate.merchant_id == merchant_id; @@ -263,7 +588,7 @@ impl MandateInterface for MockDb { mandate_constraints.offset.unwrap_or(0) }) as usize; - let mandates: Vec = if let Some(limit) = mandate_constraints.limit { + let mandates: Vec = if let Some(limit) = mandate_constraints.limit { #[allow(clippy::as_conversions)] mandates_iter .skip(offset) @@ -278,10 +603,11 @@ impl MandateInterface for MockDb { async fn insert_mandate( &self, - mandate_new: storage::MandateNew, - ) -> CustomResult { + mandate_new: storage_types::MandateNew, + _storage_scheme: MerchantStorageScheme, + ) -> CustomResult { let mut mandates = self.mandates.lock().await; - let mandate = storage::Mandate { + let mandate = storage_types::Mandate { id: i32::try_from(mandates.len()).change_context(errors::StorageError::MockDbError)?, mandate_id: mandate_new.mandate_id.clone(), customer_id: mandate_new.customer_id, diff --git a/crates/router/src/utils.rs b/crates/router/src/utils.rs index 398ff7f30d..0abe5ea936 100644 --- a/crates/router/src/utils.rs +++ b/crates/router/src/utils.rs @@ -298,6 +298,7 @@ pub async fn find_payment_intent_from_mandate_id_type( .find_mandate_by_merchant_id_mandate_id( &merchant_account.merchant_id, mandate_id.as_str(), + merchant_account.storage_scheme, ) .await .to_not_found_response(errors::ApiErrorResponse::MandateNotFound)?, @@ -305,6 +306,7 @@ pub async fn find_payment_intent_from_mandate_id_type( .find_mandate_by_merchant_id_connector_mandate_id( &merchant_account.merchant_id, connector_mandate_id.as_str(), + merchant_account.storage_scheme, ) .await .to_not_found_response(errors::ApiErrorResponse::MandateNotFound)?, diff --git a/crates/storage_impl/src/lib.rs b/crates/storage_impl/src/lib.rs index 91e308888b..f139641a56 100644 --- a/crates/storage_impl/src/lib.rs +++ b/crates/storage_impl/src/lib.rs @@ -12,6 +12,7 @@ pub mod customers; pub mod database; pub mod errors; mod lookup; +pub mod mandate; pub mod metrics; pub mod mock_db; pub mod payment_method; @@ -372,6 +373,15 @@ impl UniqueConstraints for diesel_models::PaymentMethod { } } +impl UniqueConstraints for diesel_models::Mandate { + fn unique_constraints(&self) -> Vec { + vec![format!("mand_{}_{}", self.merchant_id, self.mandate_id)] + } + fn table_name(&self) -> &str { + "Mandate" + } +} + impl UniqueConstraints for diesel_models::Customer { fn unique_constraints(&self) -> Vec { vec![format!( diff --git a/crates/storage_impl/src/mandate.rs b/crates/storage_impl/src/mandate.rs new file mode 100644 index 0000000000..10e1b193ee --- /dev/null +++ b/crates/storage_impl/src/mandate.rs @@ -0,0 +1,5 @@ +use diesel_models::Mandate; + +use crate::redis::kv_store::KvStorePartition; + +impl KvStorePartition for Mandate {} diff --git a/crates/storage_impl/src/redis/kv_store.rs b/crates/storage_impl/src/redis/kv_store.rs index 2b046ddb41..7853763f32 100644 --- a/crates/storage_impl/src/redis/kv_store.rs +++ b/crates/storage_impl/src/redis/kv_store.rs @@ -40,6 +40,10 @@ pub enum PartitionKey<'a> { merchant_id: &'a str, payout_attempt_id: &'a str, }, + MerchantIdMandateId { + merchant_id: &'a str, + mandate_id: &'a str, + }, } // PartitionKey::MerchantIdPaymentId {merchant_id, payment_id} impl<'a> std::fmt::Display for PartitionKey<'a> { @@ -62,6 +66,10 @@ impl<'a> std::fmt::Display for PartitionKey<'a> { merchant_id, payout_attempt_id, } => f.write_str(&format!("mid_{merchant_id}_poa_{payout_attempt_id}")), + PartitionKey::MerchantIdMandateId { + merchant_id, + mandate_id, + } => f.write_str(&format!("mid_{merchant_id}_mandate_{mandate_id}")), } } }