From f76f3e2f54ca518928b6f0457f1338c63e78d7e7 Mon Sep 17 00:00:00 2001 From: ItsMeShashank Date: Fri, 9 Dec 2022 13:10:44 +0530 Subject: [PATCH] feat(router): dynamically toggle KV for merchant and refactoring around it (#79) --- crates/router/src/core/admin.rs | 20 +- crates/router/src/core/payments.rs | 40 +- crates/router/src/core/payments/flows.rs | 6 +- .../src/core/payments/flows/authorize_flow.rs | 3 + .../src/core/payments/flows/cancel_flow.rs | 7 +- .../src/core/payments/flows/capture_flow.rs | 5 +- .../src/core/payments/flows/psync_flow.rs | 7 +- .../src/core/payments/flows/verfiy_flow.rs | 3 + crates/router/src/core/payments/helpers.rs | 3 +- crates/router/src/core/payments/operations.rs | 22 +- .../payments/operations/payment_cancel.rs | 27 +- .../payments/operations/payment_capture.rs | 33 +- .../payments/operations/payment_confirm.rs | 28 +- .../payments/operations/payment_create.rs | 93 +++-- .../operations/payment_method_validate.rs | 51 ++- .../payments/operations/payment_response.rs | 49 ++- .../payments/operations/payment_session.rs | 28 +- .../core/payments/operations/payment_start.rs | 27 +- .../payments/operations/payment_status.rs | 46 +- .../payments/operations/payment_update.rs | 28 +- crates/router/src/core/refunds.rs | 95 ++++- crates/router/src/core/refunds/validator.rs | 3 +- crates/router/src/db/address.rs | 1 + crates/router/src/db/connector_response.rs | 13 +- crates/router/src/db/merchant_account.rs | 3 +- crates/router/src/db/payment_attempt.rs | 394 +++++++++++------- crates/router/src/db/payment_intent.rs | 303 ++++++++------ crates/router/src/db/refund.rs | 35 +- crates/router/src/lib.rs | 1 - crates/router/src/schema.rs | 1 + crates/router/src/services/api.rs | 7 +- crates/router/src/types/storage/enums.rs | 25 +- .../src/types/storage/merchant_account.rs | 1 + .../src/types/storage/payment_attempt.rs | 25 +- .../types/storage/query/payment_attempt.rs | 8 +- .../src/types/storage/query/payment_intent.rs | 8 +- .../down.sql | 5 + .../up.sql | 8 + 38 files changed, 980 insertions(+), 482 deletions(-) create mode 100644 migrations/2022-12-07-055441_add_use_kv_to_merchant_account/down.sql create mode 100644 migrations/2022-12-07-055441_add_use_kv_to_merchant_account/up.sql diff --git a/crates/router/src/core/admin.rs b/crates/router/src/core/admin.rs index 947610868a..9ffac79673 100644 --- a/crates/router/src/core/admin.rs +++ b/crates/router/src/core/admin.rs @@ -274,7 +274,7 @@ pub async fn create_payment_connector( req: api::PaymentConnectorCreate, merchant_id: &String, ) -> RouterResponse { - store + let _merchant_account = store .find_merchant_account_by_merchant_id(merchant_id) .await .map_err(|error| { @@ -332,6 +332,13 @@ pub async fn retrieve_payment_connector( merchant_id: String, merchant_connector_id: i32, ) -> RouterResponse { + let _merchant_account = store + .find_merchant_account_by_merchant_id(&merchant_id) + .await + .map_err(|error| { + error.to_not_found_response(errors::ApiErrorResponse::MerchantAccountNotFound) + })?; + let mca = store .find_by_merchant_connector_account_merchant_id_merchant_connector_id( &merchant_id, @@ -366,11 +373,13 @@ pub async fn update_payment_connector( merchant_connector_id: i32, req: api::PaymentConnectorCreate, ) -> RouterResponse { - db.find_merchant_account_by_merchant_id(merchant_id) + let _merchant_account = db + .find_merchant_account_by_merchant_id(merchant_id) .await .map_err(|error| { error.to_not_found_response(errors::ApiErrorResponse::MerchantAccountNotFound) })?; + let mca = db .find_by_merchant_connector_account_merchant_id_merchant_connector_id( merchant_id, @@ -430,6 +439,13 @@ pub async fn delete_payment_connector( merchant_id: String, merchant_connector_id: i32, ) -> RouterResponse { + let _merchant_account = db + .find_merchant_account_by_merchant_id(&merchant_id) + .await + .map_err(|error| { + error.to_not_found_response(errors::ApiErrorResponse::MerchantAccountNotFound) + })?; + let is_deleted = db .delete_merchant_connector_account_by_merchant_id_merchant_connector_id( &merchant_id, diff --git a/crates/router/src/core/payments.rs b/crates/router/src/core/payments.rs index cea9cd2996..1c9f08b0be 100644 --- a/crates/router/src/core/payments.rs +++ b/crates/router/src/core/payments.rs @@ -65,21 +65,22 @@ where let operation: BoxedOperation = Box::new(operation); - let (operation, merchant_id, payment_id, mandate_type) = operation + let (operation, validate_result) = operation .to_validate_request()? .validate_request(&req, &merchant_account)?; - tracing::Span::current().record("payment_id", &format!("{:?}", payment_id)); + tracing::Span::current().record("payment_id", &format!("{:?}", validate_result.payment_id)); let (operation, mut payment_data, customer_details) = operation .to_get_tracker()? .get_trackers( state, - &payment_id, - merchant_id, + &validate_result.payment_id, + validate_result.merchant_id, connector.connector_name, &req, - mandate_type, + validate_result.mandate_type, + validate_result.storage_scheme, ) .await?; @@ -89,7 +90,7 @@ where &*state.store, &mut payment_data, customer_details, - merchant_id, + validate_result.merchant_id, ) .await .change_context(errors::ApiErrorResponse::InternalServerError)?; @@ -103,13 +104,20 @@ where &payment_data.payment_attempt, &payment_data.payment_method_data, &payment_data.token, + validate_result.storage_scheme, ) .await?; payment_data.payment_method_data = payment_method_data; let (operation, mut payment_data) = operation .to_update_tracker()? - .update_trackers(&*state.store, &payment_id, payment_data, customer.clone()) + .update_trackers( + &*state.store, + &validate_result.payment_id, + payment_data, + customer.clone(), + validate_result.storage_scheme, + ) .await?; operation @@ -121,7 +129,7 @@ where payment_data = call_connector_service( state, &merchant_account, - &payment_id, + &validate_result.payment_id, connector, &operation, payment_data, @@ -295,13 +303,20 @@ where customer, payment_data, call_connector_action, + merchant_account.storage_scheme, ) .await; let response = helpers::amap(res, |response| async { let operation = helpers::response_operation::(); let payment_data = operation .to_post_update_tracker()? - .update_tracker(db, payment_id, payment_data, Some(response)) + .update_tracker( + db, + payment_id, + payment_data, + Some(response), + merchant_account.storage_scheme, + ) .await?; Ok(payment_data) }) @@ -451,9 +466,10 @@ pub async fn list_payments( ) -> RouterResponse { validate_payment_list_request(&constraints)?; let merchant_id = &merchant.merchant_id; - let payment_intent = filter_by_constraints(db, &constraints, merchant_id) - .await - .map_err(|err| err.to_not_found_response(errors::ApiErrorResponse::PaymentNotFound))?; + let payment_intent = + filter_by_constraints(db, &constraints, merchant_id, merchant.storage_scheme) + .await + .map_err(|err| err.to_not_found_response(errors::ApiErrorResponse::PaymentNotFound))?; let data: Vec = payment_intent.into_iter().map(From::from).collect(); utils::when( diff --git a/crates/router/src/core/payments/flows.rs b/crates/router/src/core/payments/flows.rs index 68787cc6a8..18e75a7de4 100644 --- a/crates/router/src/core/payments/flows.rs +++ b/crates/router/src/core/payments/flows.rs @@ -11,7 +11,10 @@ use crate::{ core::{errors::RouterResult, payments}, routes::AppState, services, - types::{self, api, storage}, + types::{ + self, api, + storage::{self, enums}, + }, }; #[async_trait] @@ -33,6 +36,7 @@ pub trait Feature { maybe_customer: &Option, payment_data: PaymentData, call_connector_action: payments::CallConnectorAction, + storage_scheme: enums::MerchantStorageScheme, ) -> (RouterResult, PaymentData) where Self: std::marker::Sized, diff --git a/crates/router/src/core/payments/flows/authorize_flow.rs b/crates/router/src/core/payments/flows/authorize_flow.rs index 357051db0b..7519d244fd 100644 --- a/crates/router/src/core/payments/flows/authorize_flow.rs +++ b/crates/router/src/core/payments/flows/authorize_flow.rs @@ -60,6 +60,7 @@ impl Feature customer: &Option, payment_data: PaymentData, call_connector_action: payments::CallConnectorAction, + storage_scheme: enums::MerchantStorageScheme, ) -> (RouterResult, PaymentData) where dyn api::Connector: services::ConnectorIntegration< @@ -75,6 +76,7 @@ impl Feature customer, Some(true), call_connector_action, + storage_scheme, ) .await; @@ -92,6 +94,7 @@ impl PaymentsAuthorizeRouterData { maybe_customer: &Option, confirm: Option, call_connector_action: payments::CallConnectorAction, + _storage_scheme: enums::MerchantStorageScheme, ) -> RouterResult where dyn api::Connector + Sync: services::ConnectorIntegration< diff --git a/crates/router/src/core/payments/flows/cancel_flow.rs b/crates/router/src/core/payments/flows/cancel_flow.rs index 61d3b5a90f..5b6b0f9ba6 100644 --- a/crates/router/src/core/payments/flows/cancel_flow.rs +++ b/crates/router/src/core/payments/flows/cancel_flow.rs @@ -8,7 +8,11 @@ use crate::{ }, routes::AppState, services, - types::{self, api, storage, PaymentsCancelRouterData, PaymentsResponseData}, + types::{ + self, api, + storage::{self, enums}, + PaymentsCancelRouterData, PaymentsResponseData, + }, }; #[async_trait] @@ -41,6 +45,7 @@ impl Feature customer: &Option, payment_data: PaymentData, call_connector_action: payments::CallConnectorAction, + _storage_scheme: enums::MerchantStorageScheme, ) -> (RouterResult, PaymentData) where dyn api::Connector: services::ConnectorIntegration< diff --git a/crates/router/src/core/payments/flows/capture_flow.rs b/crates/router/src/core/payments/flows/capture_flow.rs index ef4681b1af..0bda242bc7 100644 --- a/crates/router/src/core/payments/flows/capture_flow.rs +++ b/crates/router/src/core/payments/flows/capture_flow.rs @@ -9,7 +9,9 @@ use crate::{ routes::AppState, services, types::{ - self, api, storage, PaymentsCaptureData, PaymentsCaptureRouterData, PaymentsResponseData, + self, api, + storage::{self, enums}, + PaymentsCaptureData, PaymentsCaptureRouterData, PaymentsResponseData, }, }; @@ -44,6 +46,7 @@ impl Feature customer: &Option, payment_data: PaymentData, call_connector_action: payments::CallConnectorAction, + _storage_scheme: enums::MerchantStorageScheme, ) -> (RouterResult, PaymentData) where dyn api::Connector: services::ConnectorIntegration< diff --git a/crates/router/src/core/payments/flows/psync_flow.rs b/crates/router/src/core/payments/flows/psync_flow.rs index 38d50228ea..9b269340ed 100644 --- a/crates/router/src/core/payments/flows/psync_flow.rs +++ b/crates/router/src/core/payments/flows/psync_flow.rs @@ -8,7 +8,11 @@ use crate::{ }, routes::AppState, services, - types::{self, api, storage, PaymentsResponseData, PaymentsSyncData, PaymentsSyncRouterData}, + types::{ + self, api, + storage::{self, enums}, + PaymentsResponseData, PaymentsSyncData, PaymentsSyncRouterData, + }, }; #[async_trait] @@ -43,6 +47,7 @@ impl Feature customer: &Option, payment_data: PaymentData, call_connector_action: payments::CallConnectorAction, + _storage_scheme: enums::MerchantStorageScheme, ) -> (RouterResult, PaymentData) where dyn api::Connector: services::ConnectorIntegration< diff --git a/crates/router/src/core/payments/flows/verfiy_flow.rs b/crates/router/src/core/payments/flows/verfiy_flow.rs index fa0665cb87..98f1e15fa7 100644 --- a/crates/router/src/core/payments/flows/verfiy_flow.rs +++ b/crates/router/src/core/payments/flows/verfiy_flow.rs @@ -47,6 +47,7 @@ impl Feature for types::VerifyRouterData customer: &Option, payment_data: PaymentData, call_connector_action: payments::CallConnectorAction, + storage_scheme: enums::MerchantStorageScheme, ) -> (RouterResult, PaymentData) where dyn api::Connector: services::ConnectorIntegration< @@ -62,6 +63,7 @@ impl Feature for types::VerifyRouterData customer, Some(true), call_connector_action, + storage_scheme, ) .await; @@ -77,6 +79,7 @@ impl types::VerifyRouterData { maybe_customer: &Option, confirm: Option, call_connector_action: payments::CallConnectorAction, + _storage_scheme: enums::MerchantStorageScheme, ) -> RouterResult where dyn api::Connector + Sync: services::ConnectorIntegration< diff --git a/crates/router/src/core/payments/helpers.rs b/crates/router/src/core/payments/helpers.rs index 6c19bafa10..19f7f2e8ab 100644 --- a/crates/router/src/core/payments/helpers.rs +++ b/crates/router/src/core/payments/helpers.rs @@ -807,9 +807,10 @@ pub(super) async fn filter_by_constraints( db: &dyn StorageInterface, constraints: &api::PaymentListConstraints, merchant_id: &str, + storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult, errors::StorageError> { let result = db - .filter_payment_intent_by_constraints(merchant_id, constraints) + .filter_payment_intent_by_constraints(merchant_id, constraints, storage_scheme) .await?; Ok(result) } diff --git a/crates/router/src/core/payments/operations.rs b/crates/router/src/core/payments/operations.rs index 6f1396148a..8a24b41c9d 100644 --- a/crates/router/src/core/payments/operations.rs +++ b/crates/router/src/core/payments/operations.rs @@ -68,18 +68,20 @@ pub trait Operation: Send + std::fmt::Debug { } } +pub struct ValidateResult<'a> { + pub merchant_id: &'a str, + pub payment_id: api::PaymentIdType, + pub mandate_type: Option, + pub storage_scheme: enums::MerchantStorageScheme, +} + #[allow(clippy::type_complexity)] pub trait ValidateRequest { fn validate_request<'a, 'b>( &'b self, request: &R, merchant_account: &'a storage::MerchantAccount, - ) -> RouterResult<( - BoxedOperation<'b, F, R>, - &'a str, - api::PaymentIdType, - Option, - )>; + ) -> RouterResult<(BoxedOperation<'b, F, R>, ValidateResult<'a>)>; } #[async_trait] @@ -93,6 +95,7 @@ pub trait GetTracker: Send { connector: types::Connector, request: &R, mandate_type: Option, + storage_scheme: enums::MerchantStorageScheme, ) -> RouterResult<(BoxedOperation<'a, F, R>, D, Option)>; } @@ -116,6 +119,7 @@ pub trait Domain: Send + Sync { payment_attempt: &storage::PaymentAttempt, request: &Option, token: &Option, + storage_scheme: enums::MerchantStorageScheme, ) -> RouterResult<(BoxedOperation<'a, F, R>, Option)>; async fn add_task_to_process_tracker<'a>( @@ -135,6 +139,7 @@ pub trait UpdateTracker: Send { payment_id: &api::PaymentIdType, payment_data: D, customer: Option, + storage_scheme: enums::MerchantStorageScheme, ) -> RouterResult<(BoxedOperation<'b, F, R>, D)> where F: 'b + Send; @@ -148,6 +153,7 @@ pub trait PostUpdateTracker: Send { payment_id: &api::PaymentIdType, payment_data: D, response: Option>, + storage_scheme: enums::MerchantStorageScheme, ) -> RouterResult where F: 'b + Send; @@ -192,6 +198,7 @@ where payment_attempt: &storage::PaymentAttempt, request: &Option, token: &Option, + _storage_scheme: enums::MerchantStorageScheme, ) -> RouterResult<( BoxedOperation<'a, F, api::PaymentsRequest>, Option, @@ -280,6 +287,7 @@ where payment_attempt: &storage::PaymentAttempt, request: &Option, token: &Option, + _storage_scheme: enums::MerchantStorageScheme, ) -> RouterResult<( BoxedOperation<'a, F, api::PaymentsRetrieveRequest>, Option, @@ -336,6 +344,7 @@ where _payment_attempt: &storage::PaymentAttempt, _request: &Option, _token: &Option, + _storage_scheme: enums::MerchantStorageScheme, ) -> RouterResult<( BoxedOperation<'a, F, api::PaymentsCaptureRequest>, Option, @@ -384,6 +393,7 @@ where _payment_attempt: &storage::PaymentAttempt, _request: &Option, _token: &Option, + _storage_scheme: enums::MerchantStorageScheme, ) -> RouterResult<( BoxedOperation<'a, F, api::PaymentsCancelRequest>, Option, diff --git a/crates/router/src/core/payments/operations/payment_cancel.rs b/crates/router/src/core/payments/operations/payment_cancel.rs index efe2ee53d9..8ef176cc30 100644 --- a/crates/router/src/core/payments/operations/payment_cancel.rs +++ b/crates/router/src/core/payments/operations/payment_cancel.rs @@ -9,7 +9,7 @@ use super::{BoxedOperation, Domain, GetTracker, Operation, UpdateTracker, Valida use crate::{ core::{ errors::{self, RouterResult, StorageErrorExt}, - payments::{CustomerDetails, PaymentAddress, PaymentData}, + payments::{operations, CustomerDetails, PaymentAddress, PaymentData}, }, db::StorageInterface, routes::AppState, @@ -36,6 +36,7 @@ impl GetTracker, api::PaymentsCancelRequest> _connector: Connector, request: &api::PaymentsCancelRequest, _mandate_type: Option, + storage_scheme: enums::MerchantStorageScheme, ) -> RouterResult<( BoxedOperation<'a, F, api::PaymentsCancelRequest>, PaymentData, @@ -47,14 +48,18 @@ impl GetTracker, api::PaymentsCancelRequest> .change_context(errors::ApiErrorResponse::PaymentNotFound)?; let payment_intent = db - .find_payment_intent_by_payment_id_merchant_id(&payment_id, merchant_id) + .find_payment_intent_by_payment_id_merchant_id(&payment_id, merchant_id, storage_scheme) .await .map_err(|error| { error.to_not_found_response(errors::ApiErrorResponse::PaymentNotFound) })?; let mut payment_attempt = db - .find_payment_attempt_by_payment_id_merchant_id(&payment_id, merchant_id) + .find_payment_attempt_by_payment_id_merchant_id( + &payment_id, + merchant_id, + storage_scheme, + ) .await .map_err(|error| { error.to_not_found_response(errors::ApiErrorResponse::PaymentNotFound) @@ -65,6 +70,7 @@ impl GetTracker, api::PaymentsCancelRequest> &payment_attempt.payment_id, &payment_attempt.merchant_id, &payment_attempt.txn_id, + storage_scheme, ) .await .map_err(|error| { @@ -116,6 +122,7 @@ impl UpdateTracker, api::PaymentsCancelRequest> for _payment_id: &api::PaymentIdType, mut payment_data: PaymentData, _customer: Option, + storage_scheme: enums::MerchantStorageScheme, ) -> RouterResult<( BoxedOperation<'b, F, api::PaymentsCancelRequest>, PaymentData, @@ -131,6 +138,7 @@ impl UpdateTracker, api::PaymentsCancelRequest> for status: enums::AttemptStatus::VoidInitiated, cancellation_reason, }, + storage_scheme, ) .await .map_err(|err| err.to_not_found_response(errors::ApiErrorResponse::PaymentNotFound))?; @@ -147,15 +155,16 @@ impl ValidateRequest for Payment merchant_account: &'a storage::MerchantAccount, ) -> RouterResult<( BoxedOperation<'b, F, api::PaymentsCancelRequest>, - &'a str, - api::PaymentIdType, - Option, + operations::ValidateResult<'a>, )> { Ok(( Box::new(self), - &merchant_account.merchant_id, - api::PaymentIdType::PaymentIntentId(request.payment_id.to_owned()), - None, + operations::ValidateResult { + merchant_id: &merchant_account.merchant_id, + payment_id: api::PaymentIdType::PaymentIntentId(request.payment_id.to_owned()), + mandate_type: None, + storage_scheme: merchant_account.storage_scheme, + }, )) } } diff --git a/crates/router/src/core/payments/operations/payment_capture.rs b/crates/router/src/core/payments/operations/payment_capture.rs index 375e603ef8..6efc21c3fd 100644 --- a/crates/router/src/core/payments/operations/payment_capture.rs +++ b/crates/router/src/core/payments/operations/payment_capture.rs @@ -8,11 +8,16 @@ use super::{BoxedOperation, Domain, GetTracker, Operation, UpdateTracker, Valida use crate::{ core::{ errors::{self, RouterResult, StorageErrorExt}, - payments::{self, helpers}, + payments::{self, helpers, operations}, }, db::StorageInterface, routes::AppState, - types::{api, api::PaymentsCaptureRequest, storage, Connector}, + types::{ + api, + api::PaymentsCaptureRequest, + storage::{self, enums}, + Connector, + }, utils::OptionExt, }; @@ -33,6 +38,7 @@ impl GetTracker, api::PaymentsCaptu _connector: Connector, request: &PaymentsCaptureRequest, _mandate_type: Option, + storage_scheme: enums::MerchantStorageScheme, ) -> RouterResult<( BoxedOperation<'a, F, api::PaymentsCaptureRequest>, payments::PaymentData, @@ -46,7 +52,7 @@ impl GetTracker, api::PaymentsCaptu .change_context(errors::ApiErrorResponse::PaymentNotFound)?; payment_intent = db - .find_payment_intent_by_payment_id_merchant_id(&payment_id, merchant_id) + .find_payment_intent_by_payment_id_merchant_id(&payment_id, merchant_id, storage_scheme) .await .map_err(|error| { error.to_not_found_response(errors::ApiErrorResponse::PaymentNotFound) @@ -57,7 +63,11 @@ impl GetTracker, api::PaymentsCaptu helpers::validate_amount_to_capture(payment_intent.amount, request.amount_to_capture)?; payment_attempt = db - .find_payment_attempt_by_payment_id_merchant_id(&payment_id, merchant_id) + .find_payment_attempt_by_payment_id_merchant_id( + &payment_id, + merchant_id, + storage_scheme, + ) .await .map_err(|error| { error.to_not_found_response(errors::ApiErrorResponse::PaymentNotFound) @@ -82,6 +92,7 @@ impl GetTracker, api::PaymentsCaptu &payment_attempt.payment_id, &payment_attempt.merchant_id, &payment_attempt.txn_id, + storage_scheme, ) .await .map_err(|error| { @@ -140,6 +151,7 @@ impl UpdateTracker, api::PaymentsCaptureRe _payment_id: &api::PaymentIdType, payment_data: payments::PaymentData, _customer: Option, + _storage_scheme: enums::MerchantStorageScheme, ) -> RouterResult<( BoxedOperation<'b, F, api::PaymentsCaptureRequest>, payments::PaymentData, @@ -159,9 +171,7 @@ impl ValidateRequest for Paymen merchant_account: &'a storage::MerchantAccount, ) -> RouterResult<( BoxedOperation<'b, F, api::PaymentsCaptureRequest>, - &'a str, - api::PaymentIdType, - Option, + operations::ValidateResult<'a>, )> { let payment_id = request .payment_id @@ -170,9 +180,12 @@ impl ValidateRequest for Paymen Ok(( Box::new(self), - &merchant_account.merchant_id, - api::PaymentIdType::PaymentIntentId(payment_id.to_owned()), - None, + operations::ValidateResult { + merchant_id: &merchant_account.merchant_id, + payment_id: api::PaymentIdType::PaymentIntentId(payment_id.to_owned()), + mandate_type: None, + storage_scheme: merchant_account.storage_scheme, + }, )) } } diff --git a/crates/router/src/core/payments/operations/payment_confirm.rs b/crates/router/src/core/payments/operations/payment_confirm.rs index d56dab264c..6bc12c4083 100644 --- a/crates/router/src/core/payments/operations/payment_confirm.rs +++ b/crates/router/src/core/payments/operations/payment_confirm.rs @@ -9,7 +9,7 @@ use super::{BoxedOperation, Domain, GetTracker, Operation, UpdateTracker, Valida use crate::{ core::{ errors::{self, RouterResult, StorageErrorExt}, - payments::{helpers, CustomerDetails, PaymentAddress, PaymentData}, + payments::{helpers, operations, CustomerDetails, PaymentAddress, PaymentData}, utils as core_utils, }, db::StorageInterface, @@ -37,6 +37,7 @@ impl GetTracker, api::PaymentsRequest> for Pa _connector: Connector, request: &api::PaymentsRequest, mandate_type: Option, + storage_scheme: enums::MerchantStorageScheme, ) -> RouterResult<( BoxedOperation<'a, F, api::PaymentsRequest>, PaymentData, @@ -54,7 +55,7 @@ impl GetTracker, api::PaymentsRequest> for Pa .await?; payment_intent = db - .find_payment_intent_by_payment_id_merchant_id(&payment_id, merchant_id) + .find_payment_intent_by_payment_id_merchant_id(&payment_id, merchant_id, storage_scheme) .await .map_err(|error| { error.to_not_found_response(errors::ApiErrorResponse::PaymentNotFound) @@ -78,7 +79,11 @@ impl GetTracker, api::PaymentsRequest> for Pa })?; payment_attempt = db - .find_payment_attempt_by_payment_id_merchant_id(&payment_id, merchant_id) + .find_payment_attempt_by_payment_id_merchant_id( + &payment_id, + merchant_id, + storage_scheme, + ) .await .map_err(|error| { error.to_not_found_response(errors::ApiErrorResponse::PaymentNotFound) @@ -95,6 +100,7 @@ impl GetTracker, api::PaymentsRequest> for Pa &payment_attempt.payment_id, &payment_attempt.merchant_id, &payment_attempt.txn_id, + storage_scheme, ) .await .map_err(|error| { @@ -167,6 +173,7 @@ impl UpdateTracker, api::PaymentsRequest> for Paymen _payment_id: &api::PaymentIdType, mut payment_data: PaymentData, _customer: Option, + storage_scheme: enums::MerchantStorageScheme, ) -> RouterResult<(BoxedOperation<'b, F, api::PaymentsRequest>, PaymentData)> where F: 'b + Send, @@ -194,6 +201,7 @@ impl UpdateTracker, api::PaymentsRequest> for Paymen payment_method, browser_info, }, + storage_scheme, ) .await .map_err(|error| { @@ -213,6 +221,7 @@ impl UpdateTracker, api::PaymentsRequest> for Paymen shipping_address_id: shipping_address, billing_address_id: billing_address, }, + storage_scheme, ) .await .map_err(|error| { @@ -231,9 +240,7 @@ impl ValidateRequest for PaymentConfir merchant_account: &'a storage::MerchantAccount, ) -> RouterResult<( BoxedOperation<'b, F, api::PaymentsRequest>, - &'a str, - api::PaymentIdType, - Option, + operations::ValidateResult<'a>, )> { let given_payment_id = match &request.payment_id { Some(id_type) => Some( @@ -256,9 +263,12 @@ impl ValidateRequest for PaymentConfir Ok(( Box::new(self), - &merchant_account.merchant_id, - api::PaymentIdType::PaymentIntentId(payment_id), - mandate_type, + operations::ValidateResult { + merchant_id: &merchant_account.merchant_id, + payment_id: api::PaymentIdType::PaymentIntentId(payment_id), + mandate_type, + storage_scheme: merchant_account.storage_scheme, + }, )) } } diff --git a/crates/router/src/core/payments/operations/payment_create.rs b/crates/router/src/core/payments/operations/payment_create.rs index 7e6e2ffb1a..bde66ba9c4 100644 --- a/crates/router/src/core/payments/operations/payment_create.rs +++ b/crates/router/src/core/payments/operations/payment_create.rs @@ -11,7 +11,7 @@ use crate::{ consts, core::{ errors::{self, RouterResult, StorageErrorExt}, - payments::{self, helpers, CustomerDetails, PaymentAddress, PaymentData}, + payments::{self, helpers, operations, CustomerDetails, PaymentAddress, PaymentData}, utils as core_utils, }, db::StorageInterface, @@ -40,6 +40,7 @@ impl GetTracker, api::PaymentsRequest> for Pa connector: types::Connector, request: &api::PaymentsRequest, mandate_type: Option, + storage_scheme: enums::MerchantStorageScheme, ) -> RouterResult<( BoxedOperation<'a, F, api::PaymentsRequest>, PaymentData, @@ -79,15 +80,18 @@ impl GetTracker, api::PaymentsRequest> for Pa })?; payment_attempt = match db - .insert_payment_attempt(Self::make_payment_attempt( - &payment_id, - merchant_id, - connector, - money, - payment_method_type, - request, - browser_info, - )) + .insert_payment_attempt( + Self::make_payment_attempt( + &payment_id, + merchant_id, + connector, + money, + payment_method_type, + request, + browser_info, + ), + storage_scheme, + ) .await { Ok(payment_attempt) => Ok(payment_attempt), @@ -95,25 +99,32 @@ impl GetTracker, api::PaymentsRequest> for Pa Err(err) => match err.current_context() { errors::StorageError::DatabaseError(errors::DatabaseError::UniqueViolation) => { is_update = true; - db.find_payment_attempt_by_payment_id_merchant_id(&payment_id, merchant_id) - .await - .map_err(|error| { - error.to_not_found_response(errors::ApiErrorResponse::PaymentNotFound) - }) + db.find_payment_attempt_by_payment_id_merchant_id( + &payment_id, + merchant_id, + storage_scheme, + ) + .await + .map_err(|error| { + error.to_not_found_response(errors::ApiErrorResponse::PaymentNotFound) + }) } _ => Err(err).change_context(errors::ApiErrorResponse::InternalServerError), }, }?; payment_intent = match db - .insert_payment_intent(Self::make_payment_intent( - &payment_id, - merchant_id, - &connector.to_string(), - money, - request, - shipping_address.clone().map(|x| x.address_id), - billing_address.clone().map(|x| x.address_id), - )) + .insert_payment_intent( + Self::make_payment_intent( + &payment_id, + merchant_id, + &connector.to_string(), + money, + request, + shipping_address.clone().map(|x| x.address_id), + billing_address.clone().map(|x| x.address_id), + ), + storage_scheme, + ) .await { Ok(payment_intent) => Ok(payment_intent), @@ -121,18 +132,25 @@ impl GetTracker, api::PaymentsRequest> for Pa Err(err) => match err.current_context() { errors::StorageError::DatabaseError(errors::DatabaseError::UniqueViolation) => { is_update = true; - db.find_payment_intent_by_payment_id_merchant_id(&payment_id, merchant_id) - .await - .map_err(|error| { - error.to_not_found_response(errors::ApiErrorResponse::PaymentNotFound) - }) + db.find_payment_intent_by_payment_id_merchant_id( + &payment_id, + merchant_id, + storage_scheme, + ) + .await + .map_err(|error| { + error.to_not_found_response(errors::ApiErrorResponse::PaymentNotFound) + }) } _ => Err(err).change_context(errors::ApiErrorResponse::InternalServerError), }, }?; connector_response = match db - .insert_connector_response(Self::make_connector_response(&payment_attempt)) + .insert_connector_response( + Self::make_connector_response(&payment_attempt), + storage_scheme, + ) .await { Ok(connector_resp) => Ok(connector_resp), @@ -195,6 +213,7 @@ impl UpdateTracker, api::PaymentsRequest> for Paymen _payment_id: &api::PaymentIdType, mut payment_data: PaymentData, _customer: Option, + storage_scheme: enums::MerchantStorageScheme, ) -> RouterResult<(BoxedOperation<'b, F, api::PaymentsRequest>, PaymentData)> where F: 'b + Send, @@ -225,6 +244,7 @@ impl UpdateTracker, api::PaymentsRequest> for Paymen shipping_address_id: None, billing_address_id: None, }, + storage_scheme, ) .await .map_err(|error| { @@ -248,9 +268,7 @@ impl ValidateRequest for PaymentCreate merchant_account: &'a storage::MerchantAccount, ) -> RouterResult<( BoxedOperation<'b, F, api::PaymentsRequest>, - &'a str, - api::PaymentIdType, - Option, + operations::ValidateResult<'a>, )> { let given_payment_id = match &request.payment_id { Some(id_type) => Some( @@ -281,9 +299,12 @@ impl ValidateRequest for PaymentCreate Ok(( Box::new(self), - &merchant_account.merchant_id, - api::PaymentIdType::PaymentIntentId(payment_id), - mandate_type, + operations::ValidateResult { + merchant_id: &merchant_account.merchant_id, + payment_id: api::PaymentIdType::PaymentIntentId(payment_id), + mandate_type, + storage_scheme: merchant_account.storage_scheme, + }, )) } } diff --git a/crates/router/src/core/payments/operations/payment_method_validate.rs b/crates/router/src/core/payments/operations/payment_method_validate.rs index b79cc73bcd..cd2a5cd692 100644 --- a/crates/router/src/core/payments/operations/payment_method_validate.rs +++ b/crates/router/src/core/payments/operations/payment_method_validate.rs @@ -12,7 +12,7 @@ use crate::{ consts, core::{ errors::{self, RouterResult, StorageErrorExt}, - payments::{self, helpers, Operation, PaymentData}, + payments::{self, helpers, operations, Operation, PaymentData}, utils as core_utils, }, db::StorageInterface, @@ -36,9 +36,7 @@ impl ValidateRequest for PaymentMethodVa merchant_account: &'a types::storage::MerchantAccount, ) -> RouterResult<( BoxedOperation<'b, F, api::VerifyRequest>, - &'a str, - api::PaymentIdType, - Option, + operations::ValidateResult<'a>, )> { let request_merchant_id = request.merchant_id.as_deref(); helpers::validate_merchant_id(&merchant_account.merchant_id, request_merchant_id) @@ -49,9 +47,12 @@ impl ValidateRequest for PaymentMethodVa Ok(( Box::new(self), - &merchant_account.merchant_id, - api::PaymentIdType::PaymentIntentId(validation_id), - mandate_type, + operations::ValidateResult { + merchant_id: &merchant_account.merchant_id, + payment_id: api::PaymentIdType::PaymentIntentId(validation_id), + mandate_type, + storage_scheme: merchant_account.storage_scheme, + }, )) } } @@ -67,6 +68,7 @@ impl GetTracker, api::VerifyRequest> for Paym connector: types::Connector, request: &api::VerifyRequest, _mandate_type: Option, + storage_scheme: enums::MerchantStorageScheme, ) -> RouterResult<( BoxedOperation<'a, F, api::VerifyRequest>, PaymentData, @@ -80,13 +82,16 @@ impl GetTracker, api::VerifyRequest> for Paym .change_context(errors::ApiErrorResponse::InternalServerError)?; payment_attempt = match db - .insert_payment_attempt(Self::make_payment_attempt( - &payment_id, - merchant_id, - connector, - request.payment_method, - request, - )) + .insert_payment_attempt( + Self::make_payment_attempt( + &payment_id, + merchant_id, + connector, + request.payment_method, + request, + ), + storage_scheme, + ) .await { Ok(payment_attempt) => Ok(payment_attempt), @@ -96,12 +101,10 @@ impl GetTracker, api::VerifyRequest> for Paym }?; payment_intent = match db - .insert_payment_intent(Self::make_payment_intent( - &payment_id, - merchant_id, - connector, - request, - )) + .insert_payment_intent( + Self::make_payment_intent(&payment_id, merchant_id, connector, request), + storage_scheme, + ) .await { Ok(payment_intent) => Ok(payment_intent), @@ -111,7 +114,10 @@ impl GetTracker, api::VerifyRequest> for Paym }?; connector_response = match db - .insert_connector_response(PaymentCreate::make_connector_response(&payment_attempt)) + .insert_connector_response( + PaymentCreate::make_connector_response(&payment_attempt), + storage_scheme, + ) .await { Ok(connector_resp) => Ok(connector_resp), @@ -159,6 +165,7 @@ impl UpdateTracker, api::VerifyRequest> for PaymentM _payment_id: &api::PaymentIdType, mut payment_data: PaymentData, _customer: Option, + storage_scheme: enums::MerchantStorageScheme, ) -> RouterResult<(BoxedOperation<'b, F, api::VerifyRequest>, PaymentData)> where F: 'b + Send, @@ -178,6 +185,7 @@ impl UpdateTracker, api::VerifyRequest> for PaymentM shipping_address_id: None, billing_address_id: None, }, + storage_scheme, ) .await .map_err(|err| { @@ -230,6 +238,7 @@ where payment_attempt: &storage::PaymentAttempt, request: &Option, token: &Option, + _storage_scheme: enums::MerchantStorageScheme, ) -> RouterResult<( BoxedOperation<'a, F, api::VerifyRequest>, Option, diff --git a/crates/router/src/core/payments/operations/payment_response.rs b/crates/router/src/core/payments/operations/payment_response.rs index cb0f094e74..e14e13e9e3 100644 --- a/crates/router/src/core/payments/operations/payment_response.rs +++ b/crates/router/src/core/payments/operations/payment_response.rs @@ -36,6 +36,7 @@ impl PostUpdateTracker, types::PaymentsAuthorizeData response: Option< types::RouterData, >, + storage_scheme: enums::MerchantStorageScheme, ) -> RouterResult> where F: 'b + Send, @@ -44,7 +45,14 @@ impl PostUpdateTracker, types::PaymentsAuthorizeData payment_data.mandate_id = payment_data .mandate_id .or_else(|| router_data.request.mandate_id.clone()); - Ok(payment_response_ut(db, payment_id, payment_data, Some(router_data)).await?) + Ok(payment_response_ut( + db, + payment_id, + payment_data, + Some(router_data), + storage_scheme, + ) + .await?) } } @@ -58,11 +66,12 @@ impl PostUpdateTracker, types::PaymentsSyncData> for response: Option< types::RouterData, >, + storage_scheme: enums::MerchantStorageScheme, ) -> RouterResult> where F: 'b + Send, { - Ok(payment_response_ut(db, payment_id, payment_data, response).await?) + Ok(payment_response_ut(db, payment_id, payment_data, response, storage_scheme).await?) } } @@ -78,11 +87,12 @@ impl PostUpdateTracker, types::PaymentsCaptureData> response: Option< types::RouterData, >, + storage_scheme: enums::MerchantStorageScheme, ) -> RouterResult> where F: 'b + Send, { - Ok(payment_response_ut(db, payment_id, payment_data, response).await?) + Ok(payment_response_ut(db, payment_id, payment_data, response, storage_scheme).await?) } } @@ -96,11 +106,12 @@ impl PostUpdateTracker, types::PaymentsCancelData> f response: Option< types::RouterData, >, + storage_scheme: enums::MerchantStorageScheme, ) -> RouterResult> where F: 'b + Send, { - Ok(payment_response_ut(db, payment_id, payment_data, response).await?) + Ok(payment_response_ut(db, payment_id, payment_data, response, storage_scheme).await?) } } @@ -114,11 +125,12 @@ impl PostUpdateTracker, types::VerifyRequestData> fo response: Option< types::RouterData, >, + storage_scheme: enums::MerchantStorageScheme, ) -> RouterResult> where F: 'b + Send, { - Ok(payment_response_ut(db, payment_id, payment_data, response).await?) + Ok(payment_response_ut(db, payment_id, payment_data, response, storage_scheme).await?) } } @@ -127,6 +139,7 @@ async fn payment_response_ut( _payment_id: &api::PaymentIdType, mut payment_data: PaymentData, response: Option>, + storage_scheme: enums::MerchantStorageScheme, ) -> RouterResult> { let router_data = response.ok_or(report!(errors::ApiErrorResponse::InternalServerError))?; let mut connector_response_data = None; @@ -156,7 +169,11 @@ async fn payment_response_ut( }; payment_data.payment_attempt = db - .update_payment_attempt(payment_data.payment_attempt, payment_attempt_update) + .update_payment_attempt( + payment_data.payment_attempt, + payment_attempt_update, + storage_scheme, + ) .await .map_err(|error| error.to_not_found_response(errors::ApiErrorResponse::PaymentNotFound))?; @@ -180,11 +197,15 @@ async fn payment_response_ut( encoded_data: payment_data.connector_response.encoded_data.clone(), }; - db.update_connector_response(payment_data.connector_response, connector_response_update) - .await - .map_err(|error| { - error.to_not_found_response(errors::ApiErrorResponse::PaymentNotFound) - })? + db.update_connector_response( + payment_data.connector_response, + connector_response_update, + storage_scheme, + ) + .await + .map_err(|error| { + error.to_not_found_response(errors::ApiErrorResponse::PaymentNotFound) + })? } None => payment_data.connector_response, }; @@ -201,7 +222,11 @@ async fn payment_response_ut( }; payment_data.payment_intent = db - .update_payment_intent(payment_data.payment_intent, payment_intent_update) + .update_payment_intent( + payment_data.payment_intent, + payment_intent_update, + storage_scheme, + ) .await .map_err(|error| error.to_not_found_response(errors::ApiErrorResponse::PaymentNotFound))?; diff --git a/crates/router/src/core/payments/operations/payment_session.rs b/crates/router/src/core/payments/operations/payment_session.rs index b08cbb76b4..dcb4290220 100644 --- a/crates/router/src/core/payments/operations/payment_session.rs +++ b/crates/router/src/core/payments/operations/payment_session.rs @@ -9,7 +9,7 @@ use super::{BoxedOperation, Domain, GetTracker, Operation, UpdateTracker, Valida use crate::{ core::{ errors::{self, RouterResult, StorageErrorExt}, - payments::{self, helpers, PaymentData}, + payments::{self, helpers, operations, PaymentData}, }, db::StorageInterface, routes::AppState, @@ -38,6 +38,7 @@ impl GetTracker, api::PaymentsSessionRequest> _connector: Connector, request: &api::PaymentsSessionRequest, _mandate_type: Option, + storage_scheme: enums::MerchantStorageScheme, ) -> RouterResult<( BoxedOperation<'a, F, api::PaymentsSessionRequest>, PaymentData, @@ -50,14 +51,18 @@ impl GetTracker, api::PaymentsSessionRequest> let db = &*state.store; let mut payment_attempt = db - .find_payment_attempt_by_payment_id_merchant_id(&payment_id, merchant_id) + .find_payment_attempt_by_payment_id_merchant_id( + &payment_id, + merchant_id, + storage_scheme, + ) .await .map_err(|error| { error.to_not_found_response(errors::ApiErrorResponse::PaymentNotFound) })?; let mut payment_intent = db - .find_payment_intent_by_payment_id_merchant_id(&payment_id, merchant_id) + .find_payment_intent_by_payment_id_merchant_id(&payment_id, merchant_id, storage_scheme) .await .map_err(|error| { error.to_not_found_response(errors::ApiErrorResponse::PaymentNotFound) @@ -81,6 +86,7 @@ impl GetTracker, api::PaymentsSessionRequest> payment_intent.shipping_address_id.as_deref(), ) .await?; + let billing_address = helpers::get_address_for_payment_request( db, None, @@ -97,6 +103,7 @@ impl GetTracker, api::PaymentsSessionRequest> &payment_intent.payment_id, &payment_intent.merchant_id, &payment_attempt.txn_id, + storage_scheme, ) .await .map_err(|error| { @@ -140,6 +147,7 @@ impl UpdateTracker, api::PaymentsSessionRequest> for _payment_id: &api::PaymentIdType, payment_data: PaymentData, _customer: Option, + _storage_scheme: enums::MerchantStorageScheme, ) -> RouterResult<( BoxedOperation<'b, F, api::PaymentsSessionRequest>, PaymentData, @@ -159,9 +167,7 @@ impl ValidateRequest for Paymen merchant_account: &'a storage::MerchantAccount, ) -> RouterResult<( BoxedOperation<'b, F, api::PaymentsSessionRequest>, - &'a str, - api::PaymentIdType, - Option, + operations::ValidateResult<'a>, )> { //paymentid is already generated and should be sent in the request let given_payment_id = request @@ -171,9 +177,12 @@ impl ValidateRequest for Paymen Ok(( Box::new(self), - &merchant_account.merchant_id, - api::PaymentIdType::PaymentIntentId(given_payment_id), - None, + operations::ValidateResult { + merchant_id: &merchant_account.merchant_id, + payment_id: api::PaymentIdType::PaymentIntentId(given_payment_id), + mandate_type: None, + storage_scheme: merchant_account.storage_scheme, + }, )) } } @@ -217,6 +226,7 @@ where _payment_attempt: &storage::PaymentAttempt, _request: &Option, _token: &Option, + _storage_scheme: enums::MerchantStorageScheme, ) -> RouterResult<( BoxedOperation<'b, F, api::PaymentsSessionRequest>, Option, diff --git a/crates/router/src/core/payments/operations/payment_start.rs b/crates/router/src/core/payments/operations/payment_start.rs index 71afd4a6a2..9e95f42b1f 100644 --- a/crates/router/src/core/payments/operations/payment_start.rs +++ b/crates/router/src/core/payments/operations/payment_start.rs @@ -9,7 +9,7 @@ use super::{BoxedOperation, Domain, GetTracker, Operation, UpdateTracker, Valida use crate::{ core::{ errors::{self, CustomResult, RouterResult, StorageErrorExt}, - payments::{helpers, CustomerDetails, PaymentAddress, PaymentData}, + payments::{helpers, operations, CustomerDetails, PaymentAddress, PaymentData}, }, db::StorageInterface, routes::AppState, @@ -36,6 +36,7 @@ impl GetTracker, api::PaymentsStartRequest> f _connector: Connector, _request: &api::PaymentsStartRequest, _mandate_type: Option, + storage_scheme: enums::MerchantStorageScheme, ) -> RouterResult<( BoxedOperation<'a, F, api::PaymentsStartRequest>, PaymentData, @@ -49,14 +50,18 @@ impl GetTracker, api::PaymentsStartRequest> f .change_context(errors::ApiErrorResponse::PaymentNotFound)?; payment_intent = db - .find_payment_intent_by_payment_id_merchant_id(&payment_id, merchant_id) + .find_payment_intent_by_payment_id_merchant_id(&payment_id, merchant_id, storage_scheme) .await .map_err(|error| { error.to_not_found_response(errors::ApiErrorResponse::PaymentNotFound) })?; payment_attempt = db - .find_payment_attempt_by_payment_id_merchant_id(&payment_id, merchant_id) + .find_payment_attempt_by_payment_id_merchant_id( + &payment_id, + merchant_id, + storage_scheme, + ) .await .map_err(|error| { error.to_not_found_response(errors::ApiErrorResponse::PaymentNotFound) @@ -95,6 +100,7 @@ impl GetTracker, api::PaymentsStartRequest> f &payment_intent.payment_id, &payment_intent.merchant_id, &payment_attempt.txn_id, + storage_scheme, ) .await .map_err(|error| { @@ -147,6 +153,7 @@ impl UpdateTracker, api::PaymentsStartRequest> for P _payment_id: &api::PaymentIdType, payment_data: PaymentData, _customer: Option, + _storage_scheme: enums::MerchantStorageScheme, ) -> RouterResult<( BoxedOperation<'b, F, api::PaymentsStartRequest>, PaymentData, @@ -166,9 +173,7 @@ impl ValidateRequest for PaymentS merchant_account: &'a storage::MerchantAccount, ) -> RouterResult<( BoxedOperation<'b, F, api::PaymentsStartRequest>, - &'a str, - api::PaymentIdType, - Option, + operations::ValidateResult<'a>, )> { let request_merchant_id = Some(&request.merchant_id[..]); helpers::validate_merchant_id(&merchant_account.merchant_id, request_merchant_id) @@ -181,9 +186,12 @@ impl ValidateRequest for PaymentS Ok(( Box::new(self), - &merchant_account.merchant_id, - api::PaymentIdType::PaymentIntentId(payment_id), - None, + operations::ValidateResult { + merchant_id: &merchant_account.merchant_id, + payment_id: api::PaymentIdType::PaymentIntentId(payment_id), + mandate_type: None, + storage_scheme: merchant_account.storage_scheme, + }, )) } } @@ -227,6 +235,7 @@ where payment_attempt: &storage::PaymentAttempt, request: &Option, token: &Option, + _storage_scheme: enums::MerchantStorageScheme, ) -> RouterResult<( BoxedOperation<'a, F, api::PaymentsStartRequest>, Option, diff --git a/crates/router/src/core/payments/operations/payment_status.rs b/crates/router/src/core/payments/operations/payment_status.rs index cdb9ef3638..3034f24768 100644 --- a/crates/router/src/core/payments/operations/payment_status.rs +++ b/crates/router/src/core/payments/operations/payment_status.rs @@ -9,11 +9,15 @@ use super::{BoxedOperation, Domain, GetTracker, Operation, UpdateTracker, Valida use crate::{ core::{ errors::{self, ApiErrorResponse, RouterResult, StorageErrorExt}, - payments::{helpers, CustomerDetails, PaymentAddress, PaymentData}, + payments::{helpers, operations, CustomerDetails, PaymentAddress, PaymentData}, }, db::StorageInterface, routes::AppState, - types::{api, storage, Connector}, + types::{ + api, + storage::{self, enums}, + Connector, + }, utils::{self, OptionExt}, }; #[derive(Debug, Clone, Copy, PaymentOperation)] @@ -51,6 +55,7 @@ impl UpdateTracker, api::PaymentsRequest> for Paymen _payment_id: &api::PaymentIdType, payment_data: PaymentData, _customer: Option, + _storage_scheme: enums::MerchantStorageScheme, ) -> RouterResult<(BoxedOperation<'b, F, api::PaymentsRequest>, PaymentData)> where F: 'b + Send, @@ -67,6 +72,7 @@ impl UpdateTracker, api::PaymentsRetrieveRequest> fo _payment_id: &api::PaymentIdType, payment_data: PaymentData, _customer: Option, + _storage_scheme: enums::MerchantStorageScheme, ) -> RouterResult<( BoxedOperation<'b, F, api::PaymentsRetrieveRequest>, PaymentData, @@ -91,12 +97,21 @@ impl GetTracker, api::PaymentsRetrieveRequest _connector: Connector, request: &api::PaymentsRetrieveRequest, _mandate_type: Option, + storage_scheme: enums::MerchantStorageScheme, ) -> RouterResult<( BoxedOperation<'a, F, api::PaymentsRetrieveRequest>, PaymentData, Option, )> { - get_tracker_for_sync(payment_id, merchant_id, &*state.store, request, self).await + get_tracker_for_sync( + payment_id, + merchant_id, + &*state.store, + request, + self, + storage_scheme, + ) + .await } } @@ -110,6 +125,7 @@ async fn get_tracker_for_sync< db: &dyn StorageInterface, request: &api::PaymentsRetrieveRequest, operation: Op, + storage_scheme: enums::MerchantStorageScheme, ) -> RouterResult<( BoxedOperation<'a, F, api::PaymentsRetrieveRequest>, PaymentData, @@ -119,13 +135,13 @@ async fn get_tracker_for_sync< payment_attempt = match payment_id { api::PaymentIdType::PaymentIntentId(ref id) => { - db.find_payment_attempt_by_payment_id_merchant_id(id, merchant_id) + db.find_payment_attempt_by_payment_id_merchant_id(id, merchant_id, storage_scheme) } api::PaymentIdType::ConnectorTransactionId(ref id) => { - db.find_payment_attempt_by_merchant_id_connector_txn_id(merchant_id, id) + db.find_payment_attempt_by_merchant_id_connector_txn_id(merchant_id, id, storage_scheme) } api::PaymentIdType::PaymentTxnId(ref id) => { - db.find_payment_attempt_by_merchant_id_txn_id(merchant_id, id) + db.find_payment_attempt_by_merchant_id_txn_id(merchant_id, id, storage_scheme) } } .await @@ -134,7 +150,7 @@ async fn get_tracker_for_sync< let payment_id_str = payment_attempt.payment_id.clone(); payment_intent = db - .find_payment_intent_by_payment_id_merchant_id(&payment_id_str, merchant_id) + .find_payment_intent_by_payment_id_merchant_id(&payment_id_str, merchant_id, storage_scheme) .await .map_err(|error| error.to_not_found_response(errors::ApiErrorResponse::PaymentNotFound))?; @@ -143,6 +159,7 @@ async fn get_tracker_for_sync< &payment_intent.payment_id, &payment_intent.merchant_id, &payment_attempt.txn_id, + storage_scheme, ) .await .change_context(errors::ApiErrorResponse::InternalServerError) @@ -168,7 +185,7 @@ async fn get_tracker_for_sync< )?; let refunds = db - .find_refund_by_payment_id_merchant_id(&payment_id_str, merchant_id) + .find_refund_by_payment_id_merchant_id(&payment_id_str, merchant_id, storage_scheme) .await .change_context(errors::ApiErrorResponse::InternalServerError)?; @@ -204,9 +221,7 @@ impl ValidateRequest for Payme merchant_account: &'a storage::MerchantAccount, ) -> RouterResult<( BoxedOperation<'b, F, api::PaymentsRetrieveRequest>, - &'a str, - api::PaymentIdType, - Option, + operations::ValidateResult<'a>, )> { let request_merchant_id = request.merchant_id.as_deref(); helpers::validate_merchant_id(&merchant_account.merchant_id, request_merchant_id) @@ -217,9 +232,12 @@ impl ValidateRequest for Payme Ok(( Box::new(self), - &merchant_account.merchant_id, - request.resource_id.clone(), - None, + operations::ValidateResult { + merchant_id: &merchant_account.merchant_id, + payment_id: request.resource_id.clone(), + mandate_type: None, + storage_scheme: merchant_account.storage_scheme, + }, )) } } diff --git a/crates/router/src/core/payments/operations/payment_update.rs b/crates/router/src/core/payments/operations/payment_update.rs index b2aa0f9873..5d43e35c4e 100644 --- a/crates/router/src/core/payments/operations/payment_update.rs +++ b/crates/router/src/core/payments/operations/payment_update.rs @@ -9,7 +9,7 @@ use super::{BoxedOperation, Domain, GetTracker, Operation, UpdateTracker, Valida use crate::{ core::{ errors::{self, RouterResult, StorageErrorExt}, - payments::{self, helpers, CustomerDetails, PaymentAddress, PaymentData}, + payments::{self, helpers, operations, CustomerDetails, PaymentAddress, PaymentData}, utils as core_utils, }, db::StorageInterface, @@ -36,6 +36,7 @@ impl GetTracker, api::PaymentsRequest> for Pa _connector: Connector, request: &api::PaymentsRequest, mandate_type: Option, + storage_scheme: enums::MerchantStorageScheme, ) -> RouterResult<( BoxedOperation<'a, F, api::PaymentsRequest>, PaymentData, @@ -58,7 +59,11 @@ impl GetTracker, api::PaymentsRequest> for Pa .await?; payment_attempt = db - .find_payment_attempt_by_payment_id_merchant_id(&payment_id, merchant_id) + .find_payment_attempt_by_payment_id_merchant_id( + &payment_id, + merchant_id, + storage_scheme, + ) .await .map_err(|error| { error.to_not_found_response(errors::ApiErrorResponse::PaymentNotFound) @@ -78,7 +83,7 @@ impl GetTracker, api::PaymentsRequest> for Pa amount = request.amount.unwrap_or(payment_attempt.amount); payment_intent = db - .find_payment_intent_by_payment_id_merchant_id(&payment_id, merchant_id) + .find_payment_intent_by_payment_id_merchant_id(&payment_id, merchant_id, storage_scheme) .await .map_err(|error| { error.to_not_found_response(errors::ApiErrorResponse::PaymentNotFound) @@ -114,6 +119,7 @@ impl GetTracker, api::PaymentsRequest> for Pa &payment_intent.payment_id, &payment_intent.merchant_id, &payment_attempt.txn_id, + storage_scheme, ) .await .map_err(|error| { @@ -172,6 +178,7 @@ impl UpdateTracker, api::PaymentsRequest> for Paymen _payment_id: &api::PaymentIdType, mut payment_data: PaymentData, customer: Option, + storage_scheme: enums::MerchantStorageScheme, ) -> RouterResult<(BoxedOperation<'b, F, api::PaymentsRequest>, PaymentData)> where F: 'b + Send, @@ -200,6 +207,7 @@ impl UpdateTracker, api::PaymentsRequest> for Paymen authentication_type: None, payment_method, }, + storage_scheme, ) .await .map_err(|error| { @@ -235,6 +243,7 @@ impl UpdateTracker, api::PaymentsRequest> for Paymen shipping_address_id: shipping_address, billing_address_id: billing_address, }, + storage_scheme, ) .await .map_err(|error| { @@ -258,9 +267,7 @@ impl ValidateRequest for PaymentUpdate merchant_account: &'a storage::MerchantAccount, ) -> RouterResult<( BoxedOperation<'b, F, api::PaymentsRequest>, - &'a str, - api::PaymentIdType, - Option, + operations::ValidateResult<'a>, )> { let given_payment_id = match &request.payment_id { Some(id_type) => Some( @@ -291,9 +298,12 @@ impl ValidateRequest for PaymentUpdate Ok(( Box::new(self), - &merchant_account.merchant_id, - api::PaymentIdType::PaymentIntentId(payment_id), - mandate_type, + operations::ValidateResult { + merchant_id: &merchant_account.merchant_id, + payment_id: api::PaymentIdType::PaymentIntentId(payment_id), + mandate_type, + storage_scheme: merchant_account.storage_scheme, + }, )) } } diff --git a/crates/router/src/core/refunds.rs b/crates/router/src/core/refunds.rs index c6d590265d..de1c44a989 100644 --- a/crates/router/src/core/refunds.rs +++ b/crates/router/src/core/refunds.rs @@ -38,6 +38,7 @@ pub async fn refund_create_core( .find_payment_attempt_last_successful_attempt_by_payment_id_merchant_id( &req.payment_id, merchant_id, + merchant_account.storage_scheme, ) .await .change_context(errors::ApiErrorResponse::SuccessfulPaymentNotFound)?; @@ -56,7 +57,11 @@ pub async fn refund_create_core( )?; payment_intent = db - .find_payment_intent_by_payment_id_merchant_id(&req.payment_id, merchant_id) + .find_payment_intent_by_payment_id_merchant_id( + &req.payment_id, + merchant_id, + merchant_account.storage_scheme, + ) .await .change_context(errors::ApiErrorResponse::PaymentNotFound)?; @@ -142,7 +147,11 @@ pub async fn trigger_refund_to_gateway( let response = state .store - .update_refund(refund.to_owned(), refund_update) + .update_refund( + refund.to_owned(), + refund_update, + merchant_account.storage_scheme, + ) .await .change_context(errors::ApiErrorResponse::InternalServerError)?; Ok(response) @@ -162,13 +171,21 @@ pub async fn refund_retrieve_core( merchant_id = &merchant_account.merchant_id; refund = db - .find_refund_by_merchant_id_refund_id(merchant_id, refund_id.as_str()) + .find_refund_by_merchant_id_refund_id( + merchant_id, + refund_id.as_str(), + merchant_account.storage_scheme, + ) .await .map_err(|error| error.to_not_found_response(errors::ApiErrorResponse::RefundNotFound))?; let payment_id = refund.payment_id.as_str(); payment_intent = db - .find_payment_intent_by_payment_id_merchant_id(payment_id, merchant_id) + .find_payment_intent_by_payment_id_merchant_id( + payment_id, + merchant_id, + merchant_account.storage_scheme, + ) .await .map_err(|error| error.to_not_found_response(errors::ApiErrorResponse::PaymentNotFound))?; @@ -177,6 +194,7 @@ pub async fn refund_retrieve_core( &refund.transaction_id, payment_id, merchant_id, + merchant_account.storage_scheme, ) .await .map_err(|error| error.to_not_found_response(errors::ApiErrorResponse::PaymentNotFound))?; @@ -251,7 +269,11 @@ pub async fn sync_refund_with_gateway( let response = state .store - .update_refund(refund.to_owned(), refund_update) + .update_refund( + refund.to_owned(), + refund_update, + merchant_account.storage_scheme, + ) .await .change_context(errors::ApiErrorResponse::InternalServerError)?; Ok(response) @@ -266,7 +288,11 @@ pub async fn refund_update_core( req: refunds::RefundRequest, ) -> RouterResponse { let refund = db - .find_refund_by_merchant_id_refund_id(&merchant_account.merchant_id, refund_id) + .find_refund_by_merchant_id_refund_id( + &merchant_account.merchant_id, + refund_id, + merchant_account.storage_scheme, + ) .await .map_err(|error| error.to_not_found_response(errors::ApiErrorResponse::RefundNotFound))?; @@ -276,6 +302,7 @@ pub async fn refund_update_core( storage::RefundUpdate::MetadataUpdate { metadata: req.metadata, }, + merchant_account.storage_scheme, ) .await .change_context(errors::ApiErrorResponse::InternalServerError)?; @@ -323,6 +350,7 @@ pub async fn validate_and_create_refund( &payment_intent.payment_id, &merchant_account.merchant_id, &refund_id, + merchant_account.storage_scheme, ) .await .change_context(errors::ApiErrorResponse::InternalServerError)? @@ -338,6 +366,7 @@ pub async fn validate_and_create_refund( .find_refund_by_merchant_id_transaction_id( &merchant_account.merchant_id, connecter_transaction_id, + merchant_account.storage_scheme, ) .await .change_context(errors::ApiErrorResponse::RefundNotFound) @@ -371,9 +400,12 @@ pub async fn validate_and_create_refund( refund_amount, ); - refund = db.insert_refund(refund_create_req).await.map_err(|error| { - error.to_duplicate_response(errors::ApiErrorResponse::DuplicateRefundRequest) - })?; + refund = db + .insert_refund(refund_create_req, merchant_account.storage_scheme) + .await + .map_err(|error| { + error.to_duplicate_response(errors::ApiErrorResponse::DuplicateRefundRequest) + })?; schedule_refund_execution( state, refund, @@ -548,11 +580,20 @@ pub async fn sync_refund_with_gateway_workflow( refund_tracker.tracking_data ) })?; + + let merchant_account = db + .find_merchant_account_by_merchant_id(&refund_core.merchant_id) + .await + .map_err(|error| { + error.to_not_found_response(errors::ApiErrorResponse::MerchantAccountNotFound) + })?; + // FIXME we actually don't use this? let _refund = db .find_refund_by_internal_reference_id_merchant_id( &refund_core.refund_internal_reference_id, &refund_core.merchant_id, + merchant_account.storage_scheme, ) .await .map_err(|error| error.to_not_found_response(errors::ApiErrorResponse::RefundNotFound))?; @@ -592,41 +633,53 @@ pub async fn trigger_refund_execute_workflow( refund_tracker.tracking_data ) })?; + + let merchant_account = db + .find_merchant_account_by_merchant_id(&refund_core.merchant_id) + .await + .map_err(|error| { + error.to_not_found_response(errors::ApiErrorResponse::MerchantAccountNotFound) + })?; + let refund = db .find_refund_by_internal_reference_id_merchant_id( &refund_core.refund_internal_reference_id, &refund_core.merchant_id, + merchant_account.storage_scheme, ) .await .map_err(|error| error.to_not_found_response(errors::ApiErrorResponse::RefundNotFound))?; match (&refund.sent_to_gateway, &refund.refund_status) { //FIXME: Conversion should come from trait (false, enums::RefundStatus::Pending) => { + let merchant_account = db + .find_merchant_account_by_merchant_id(&refund.merchant_id) + .await + .map_err(|error| { + error.to_not_found_response(errors::ApiErrorResponse::MerchantAccountNotFound) + })?; + let payment_attempt = db .find_payment_attempt_by_transaction_id_payment_id_merchant_id( &refund.transaction_id, &refund_core.payment_id, &refund.merchant_id, - ) - .await - .map_err(|error| { - error.to_not_found_response(errors::ApiErrorResponse::PaymentNotFound) - })?; - let payment_intent = db - .find_payment_intent_by_payment_id_merchant_id( - &payment_attempt.payment_id, - &refund.merchant_id, + merchant_account.storage_scheme, ) .await .map_err(|error| { error.to_not_found_response(errors::ApiErrorResponse::PaymentNotFound) })?; - let merchant_account = db - .find_merchant_account_by_merchant_id(&refund.merchant_id) + let payment_intent = db + .find_payment_intent_by_payment_id_merchant_id( + &payment_attempt.payment_id, + &refund.merchant_id, + merchant_account.storage_scheme, + ) .await .map_err(|error| { - error.to_not_found_response(errors::ApiErrorResponse::MerchantAccountNotFound) + error.to_not_found_response(errors::ApiErrorResponse::PaymentNotFound) })?; //trigger refund request to gateway diff --git a/crates/router/src/core/refunds/validator.rs b/crates/router/src/core/refunds/validator.rs index 586f6b0949..056a4ef058 100644 --- a/crates/router/src/core/refunds/validator.rs +++ b/crates/router/src/core/refunds/validator.rs @@ -95,9 +95,10 @@ pub async fn validate_uniqueness_of_refund_id_against_merchant_id( payment_id: &str, merchant_id: &str, refund_id: &str, + storage_scheme: enums::MerchantStorageScheme, ) -> RouterResult> { let refund = db - .find_refund_by_merchant_id_refund_id(merchant_id, refund_id) + .find_refund_by_merchant_id_refund_id(merchant_id, refund_id, storage_scheme) .await; logger::debug!(?refund); match refund { diff --git a/crates/router/src/db/address.rs b/crates/router/src/db/address.rs index 26ce838ee1..522ac70a99 100644 --- a/crates/router/src/db/address.rs +++ b/crates/router/src/db/address.rs @@ -16,6 +16,7 @@ pub trait AddressInterface { &self, address: AddressNew, ) -> CustomResult; + async fn find_address(&self, address_id: &str) -> CustomResult; } diff --git a/crates/router/src/db/connector_response.rs b/crates/router/src/db/connector_response.rs index 0cd8091ff6..cc68e2b9ad 100644 --- a/crates/router/src/db/connector_response.rs +++ b/crates/router/src/db/connector_response.rs @@ -2,7 +2,7 @@ use super::MockDb; use crate::{ connection::pg_connection, core::errors::{self, CustomResult}, - types::storage::{ConnectorResponse, ConnectorResponseNew, ConnectorResponseUpdate}, + types::storage::{enums, ConnectorResponse, ConnectorResponseNew, ConnectorResponseUpdate}, }; #[async_trait::async_trait] @@ -10,17 +10,22 @@ pub trait ConnectorResponseInterface { async fn insert_connector_response( &self, connector_response: ConnectorResponseNew, + storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult; + async fn find_connector_response_by_payment_id_merchant_id_txn_id( &self, payment_id: &str, merchant_id: &str, txn_id: &str, + storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult; + async fn update_connector_response( &self, this: ConnectorResponse, payment_attempt: ConnectorResponseUpdate, + storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult; } @@ -29,6 +34,7 @@ impl ConnectorResponseInterface for super::Store { async fn insert_connector_response( &self, connector_response: ConnectorResponseNew, + _storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult { let conn = pg_connection(&self.master_pool).await; connector_response.insert(&conn).await @@ -39,6 +45,7 @@ impl ConnectorResponseInterface for super::Store { payment_id: &str, merchant_id: &str, txn_id: &str, + _storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult { let conn = pg_connection(&self.master_pool).await; ConnectorResponse::find_by_payment_id_and_merchant_id_transaction_id( @@ -54,6 +61,7 @@ impl ConnectorResponseInterface for super::Store { &self, this: ConnectorResponse, connector_response_update: ConnectorResponseUpdate, + _storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult { let conn = pg_connection(&self.master_pool).await; this.update(&conn, connector_response_update).await @@ -65,6 +73,7 @@ impl ConnectorResponseInterface for MockDb { async fn insert_connector_response( &self, new: ConnectorResponseNew, + _storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult { let mut connector_response = self.connector_response.lock().await; let response = ConnectorResponse { @@ -88,6 +97,7 @@ impl ConnectorResponseInterface for MockDb { _payment_id: &str, _merchant_id: &str, _txn_id: &str, + _storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult { todo!() } @@ -96,6 +106,7 @@ impl ConnectorResponseInterface for MockDb { &self, this: ConnectorResponse, connector_response_update: ConnectorResponseUpdate, + _storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult { let mut connector_response = self.connector_response.lock().await; let response = connector_response diff --git a/crates/router/src/db/merchant_account.rs b/crates/router/src/db/merchant_account.rs index 3390c0e463..435e9e88c3 100644 --- a/crates/router/src/db/merchant_account.rs +++ b/crates/router/src/db/merchant_account.rs @@ -5,7 +5,7 @@ use super::MockDb; use crate::{ connection::pg_connection, core::errors::{self, CustomResult, DatabaseError, StorageError}, - types::storage::{MerchantAccount, MerchantAccountNew, MerchantAccountUpdate}, + types::storage::{enums, MerchantAccount, MerchantAccountNew, MerchantAccountUpdate}, }; #[async_trait::async_trait] @@ -122,6 +122,7 @@ impl MerchantAccountInterface for MockDb { sub_merchants_enabled: merchant_account.sub_merchants_enabled, parent_merchant_id: merchant_account.parent_merchant_id, publishable_key: merchant_account.publishable_key, + storage_scheme: enums::MerchantStorageScheme::PostgresOnly, }; accounts.push(account.clone()); Ok(account) diff --git a/crates/router/src/db/payment_attempt.rs b/crates/router/src/db/payment_attempt.rs index 73c31c71f3..322891c6f0 100644 --- a/crates/router/src/db/payment_attempt.rs +++ b/crates/router/src/db/payment_attempt.rs @@ -1,7 +1,7 @@ use super::MockDb; use crate::{ core::errors::{self, CustomResult}, - types::storage::{PaymentAttempt, PaymentAttemptNew, PaymentAttemptUpdate}, + types::storage::{enums, PaymentAttempt, PaymentAttemptNew, PaymentAttemptUpdate}, }; #[async_trait::async_trait] @@ -9,18 +9,21 @@ pub trait PaymentAttemptInterface { async fn insert_payment_attempt( &self, payment_attempt: PaymentAttemptNew, + storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult; async fn update_payment_attempt( &self, this: PaymentAttempt, payment_attempt: PaymentAttemptUpdate, + storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult; async fn find_payment_attempt_by_payment_id_merchant_id( &self, payment_id: &str, merchant_id: &str, + storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult; async fn find_payment_attempt_by_transaction_id_payment_id_merchant_id( @@ -28,24 +31,28 @@ pub trait PaymentAttemptInterface { transaction_id: &str, payment_id: &str, merchant_id: &str, + storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult; async fn find_payment_attempt_last_successful_attempt_by_payment_id_merchant_id( &self, payment_id: &str, merchant_id: &str, + storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult; async fn find_payment_attempt_by_merchant_id_connector_txn_id( &self, merchant_id: &str, connector_txn_id: &str, + storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult; async fn find_payment_attempt_by_merchant_id_txn_id( &self, merchant_id: &str, txn_id: &str, + storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult; } @@ -56,7 +63,7 @@ mod storage { connection::pg_connection, core::errors::{self, CustomResult}, services::Store, - types::storage::payment_attempt::*, + types::storage::{enums, payment_attempt::*}, }; #[async_trait::async_trait] @@ -64,6 +71,7 @@ mod storage { async fn insert_payment_attempt( &self, payment_attempt: PaymentAttemptNew, + _storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult { let conn = pg_connection(&self.master_pool).await; payment_attempt.insert_diesel(&conn).await @@ -73,6 +81,7 @@ mod storage { &self, this: PaymentAttempt, payment_attempt: PaymentAttemptUpdate, + _storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult { let conn = pg_connection(&self.master_pool).await; this.update(&conn, payment_attempt).await @@ -82,6 +91,7 @@ mod storage { &self, payment_id: &str, merchant_id: &str, + _storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult { let conn = pg_connection(&self.master_pool).await; PaymentAttempt::find_by_payment_id_merchant_id(&conn, payment_id, merchant_id).await @@ -92,6 +102,7 @@ mod storage { transaction_id: &str, payment_id: &str, merchant_id: &str, + _storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult { let conn = pg_connection(&self.master_pool).await; PaymentAttempt::find_by_transaction_id_payment_id_merchant_id( @@ -107,6 +118,7 @@ mod storage { &self, payment_id: &str, merchant_id: &str, + _storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult { let conn = pg_connection(&self.master_pool).await; PaymentAttempt::find_last_successful_attempt_by_payment_id_merchant_id( @@ -121,6 +133,7 @@ mod storage { &self, merchant_id: &str, connector_txn_id: &str, + _storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult { let conn = pg_connection(&self.master_pool).await; // TODO: update logic to lookup all payment attempts for an intent @@ -137,6 +150,7 @@ mod storage { &self, merchant_id: &str, txn_id: &str, + _storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult { let conn = pg_connection(&self.master_pool).await; @@ -151,6 +165,7 @@ impl PaymentAttemptInterface for MockDb { &self, _merchant_id: &str, _txn_id: &str, + _storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult { todo!() } @@ -159,6 +174,7 @@ impl PaymentAttemptInterface for MockDb { &self, _merchant_id: &str, _connector_txn_id: &str, + _storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult { todo!() } @@ -167,6 +183,7 @@ impl PaymentAttemptInterface for MockDb { async fn insert_payment_attempt( &self, payment_attempt: PaymentAttemptNew, + _storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult { let mut payment_attempts = self.payment_attempts.lock().await; let id = payment_attempts.len() as i32; @@ -211,6 +228,7 @@ impl PaymentAttemptInterface for MockDb { &self, this: PaymentAttempt, payment_attempt: PaymentAttemptUpdate, + _storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult { let mut payment_attempts = self.payment_attempts.lock().await; @@ -228,6 +246,7 @@ impl PaymentAttemptInterface for MockDb { &self, _payment_id: &str, _merchant_id: &str, + _storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult { todo!() } @@ -237,6 +256,7 @@ impl PaymentAttemptInterface for MockDb { _transaction_id: &str, _payment_id: &str, _merchant_id: &str, + _storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult { todo!() } @@ -245,6 +265,7 @@ impl PaymentAttemptInterface for MockDb { &self, payment_id: &str, merchant_id: &str, + _storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult { let payment_attempts = self.payment_attempts.lock().await; @@ -280,69 +301,142 @@ mod storage { async fn insert_payment_attempt( &self, payment_attempt: PaymentAttemptNew, + storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult { - let key = format!( - "{}_{}", - payment_attempt.payment_id, payment_attempt.merchant_id - ); - // TODO: need to add an application generated payment attempt id to distinguish between multiple attempts for the same payment id - // Check for database presence as well Maybe use a read replica here ? - let created_attempt = PaymentAttempt { - id: 0i32, - payment_id: payment_attempt.payment_id.clone(), - merchant_id: payment_attempt.merchant_id.clone(), - txn_id: payment_attempt.txn_id.clone(), - status: payment_attempt.status, - amount: payment_attempt.amount, - currency: payment_attempt.currency, - save_to_locker: payment_attempt.save_to_locker, - connector: payment_attempt.connector.clone(), - error_message: payment_attempt.error_message.clone(), - offer_amount: payment_attempt.offer_amount, - surcharge_amount: payment_attempt.surcharge_amount, - tax_amount: payment_attempt.tax_amount, - payment_method_id: payment_attempt.payment_method_id.clone(), - payment_method: payment_attempt.payment_method, - payment_flow: payment_attempt.payment_flow, - redirect: payment_attempt.redirect, - connector_transaction_id: payment_attempt.connector_transaction_id.clone(), - capture_method: payment_attempt.capture_method, - capture_on: payment_attempt.capture_on, - confirm: payment_attempt.confirm, - authentication_type: payment_attempt.authentication_type, - created_at: payment_attempt.created_at.unwrap_or_else(date_time::now), - modified_at: payment_attempt.created_at.unwrap_or_else(date_time::now), - last_synced: payment_attempt.last_synced, - amount_to_capture: payment_attempt.amount_to_capture, - cancellation_reason: payment_attempt.cancellation_reason.clone(), - mandate_id: payment_attempt.mandate_id.clone(), - browser_info: payment_attempt.browser_info.clone(), - }; - // TODO: Add a proper error for serialization failure - let redis_value = serde_json::to_string(&created_attempt) - .into_report() - .change_context(errors::StorageError::KVError)?; - match self - .redis_conn - .pool - .hsetnx::(&key, "pa", &redis_value) - .await - { - Ok(0) => Err(errors::StorageError::DuplicateValue(format!( - "Payment Attempt already exists for payment_id: {}", - key - ))) - .into_report(), - Ok(1) => { + match storage_scheme { + enums::MerchantStorageScheme::PostgresOnly => { let conn = pg_connection(&self.master_pool).await; - let query = payment_attempt - .insert_diesel(&conn) + payment_attempt.insert_diesel(&conn).await + } + + enums::MerchantStorageScheme::RedisKv => { + let key = format!( + "{}_{}", + payment_attempt.payment_id, payment_attempt.merchant_id + ); + // TODO: need to add an application generated payment attempt id to distinguish between multiple attempts for the same payment id + // Check for database presence as well Maybe use a read replica here ? + let created_attempt = PaymentAttempt { + id: 0i32, + payment_id: payment_attempt.payment_id.clone(), + merchant_id: payment_attempt.merchant_id.clone(), + txn_id: payment_attempt.txn_id.clone(), + status: payment_attempt.status, + amount: payment_attempt.amount, + currency: payment_attempt.currency, + save_to_locker: payment_attempt.save_to_locker, + connector: payment_attempt.connector.clone(), + error_message: payment_attempt.error_message.clone(), + offer_amount: payment_attempt.offer_amount, + surcharge_amount: payment_attempt.surcharge_amount, + tax_amount: payment_attempt.tax_amount, + payment_method_id: payment_attempt.payment_method_id.clone(), + payment_method: payment_attempt.payment_method, + payment_flow: payment_attempt.payment_flow, + redirect: payment_attempt.redirect, + connector_transaction_id: payment_attempt.connector_transaction_id.clone(), + capture_method: payment_attempt.capture_method, + capture_on: payment_attempt.capture_on, + confirm: payment_attempt.confirm, + authentication_type: payment_attempt.authentication_type, + created_at: payment_attempt.created_at.unwrap_or_else(date_time::now), + modified_at: payment_attempt.created_at.unwrap_or_else(date_time::now), + last_synced: payment_attempt.last_synced, + amount_to_capture: payment_attempt.amount_to_capture, + cancellation_reason: payment_attempt.cancellation_reason.clone(), + mandate_id: payment_attempt.mandate_id.clone(), + browser_info: payment_attempt.browser_info.clone(), + }; + // TODO: Add a proper error for serialization failure + let redis_value = serde_json::to_string(&created_attempt) + .into_report() + .change_context(errors::StorageError::KVError)?; + match self + .redis_conn + .pool + .hsetnx::(&key, "pa", &redis_value) + .await + { + Ok(0) => Err(errors::StorageError::DuplicateValue(format!( + "Payment Attempt already exists for payment_id: {}", + key + ))) + .into_report(), + Ok(1) => { + let conn = pg_connection(&self.master_pool).await; + let query = payment_attempt + .insert_diesel_query(&conn) + .await + .change_context(errors::StorageError::KVError)?; + let stream_name = self.drainer_stream(&PaymentAttempt::shard_key( + crate::utils::storage_partitioning::PartitionKey::MerchantIdPaymentId { + merchant_id: &created_attempt.merchant_id, + payment_id: &created_attempt.payment_id, + }, + self.config.drainer_num_partitions, + )); + self.redis_conn + .stream_append_entry( + &stream_name, + &RedisEntryId::AutoGeneratedID, + query.to_field_value_pairs(), + ) + .await + .change_context(errors::StorageError::KVError)?; + Ok(created_attempt) + } + Ok(i) => Err(errors::StorageError::KVError) + .into_report() + .attach_printable_lazy(|| { + format!("Invalid response for HSETNX: {}", i) + }), + Err(er) => Err(er) + .into_report() + .change_context(errors::StorageError::KVError), + } + } + } + } + + async fn update_payment_attempt( + &self, + this: PaymentAttempt, + payment_attempt: PaymentAttemptUpdate, + storage_scheme: enums::MerchantStorageScheme, + ) -> CustomResult { + match storage_scheme { + enums::MerchantStorageScheme::PostgresOnly => { + let conn = pg_connection(&self.master_pool).await; + this.update(&conn, payment_attempt).await + } + + enums::MerchantStorageScheme::RedisKv => { + let key = format!("{}_{}", this.payment_id, this.merchant_id); + + let updated_attempt = payment_attempt.clone().apply_changeset(this.clone()); + // Check for database presence as well Maybe use a read replica here ? + // TODO: Add a proper error for serialization failure + let redis_value = serde_json::to_string(&updated_attempt) + .into_report() + .change_context(errors::StorageError::KVError)?; + let updated_attempt = self + .redis_conn + .pool + .hset::(&key, ("pa", redis_value)) + .await + .map(|_| updated_attempt) + .into_report() + .change_context(errors::StorageError::KVError)?; + + let conn = pg_connection(&self.master_pool).await; + let query = this + .update_query(&conn, payment_attempt) .await .change_context(errors::StorageError::KVError)?; let stream_name = self.drainer_stream(&PaymentAttempt::shard_key( crate::utils::storage_partitioning::PartitionKey::MerchantIdPaymentId { - merchant_id: &created_attempt.merchant_id, - payment_id: &created_attempt.payment_id, + merchant_id: &updated_attempt.merchant_id, + payment_id: &updated_attempt.payment_id, }, self.config.drainer_num_partitions, )); @@ -354,80 +448,40 @@ mod storage { ) .await .change_context(errors::StorageError::KVError)?; - Ok(created_attempt) + Ok(updated_attempt) } - Ok(i) => Err(errors::StorageError::KVError) - .into_report() - .attach_printable_lazy(|| format!("Invalid response for HSETNX: {}", i)), - Err(er) => Err(er) - .into_report() - .change_context(errors::StorageError::KVError), } } - async fn update_payment_attempt( - &self, - this: PaymentAttempt, - payment_attempt: PaymentAttemptUpdate, - ) -> CustomResult { - let key = format!("{}_{}", this.payment_id, this.merchant_id); - - let updated_attempt = payment_attempt.clone().apply_changeset(this.clone()); - // Check for database presence as well Maybe use a read replica here ? - // TODO: Add a proper error for serialization failure - let redis_value = serde_json::to_string(&updated_attempt) - .into_report() - .change_context(errors::StorageError::KVError)?; - let updated_attempt = self - .redis_conn - .pool - .hset::(&key, ("pa", redis_value)) - .await - .map(|_| updated_attempt) - .into_report() - .change_context(errors::StorageError::KVError)?; - - let conn = pg_connection(&self.master_pool).await; - let query = this - .update(&conn, payment_attempt) - .await - .change_context(errors::StorageError::KVError)?; - let stream_name = self.drainer_stream(&PaymentAttempt::shard_key( - crate::utils::storage_partitioning::PartitionKey::MerchantIdPaymentId { - merchant_id: &updated_attempt.merchant_id, - payment_id: &updated_attempt.payment_id, - }, - self.config.drainer_num_partitions, - )); - self.redis_conn - .stream_append_entry( - &stream_name, - &RedisEntryId::AutoGeneratedID, - query.to_field_value_pairs(), - ) - .await - .change_context(errors::StorageError::KVError)?; - Ok(updated_attempt) - } - async fn find_payment_attempt_by_payment_id_merchant_id( &self, payment_id: &str, merchant_id: &str, + storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult { - let key = format!("{}_{}", payment_id, merchant_id); - self.redis_conn - .pool - .hget::(key, "pa") - .await - .into_report() - .change_context(errors::StorageError::KVError) - .and_then(|redis_resp| { - serde_json::from_str::(&redis_resp) + match storage_scheme { + enums::MerchantStorageScheme::PostgresOnly => { + let conn = pg_connection(&self.master_pool).await; + PaymentAttempt::find_by_payment_id_merchant_id(&conn, payment_id, merchant_id) + .await + } + + enums::MerchantStorageScheme::RedisKv => { + let key = format!("{}_{}", payment_id, merchant_id); + self.redis_conn + .pool + .hget::(key, "pa") + .await .into_report() .change_context(errors::StorageError::KVError) - }) - // Check for database presence as well Maybe use a read replica here ? + .and_then(|redis_resp| { + serde_json::from_str::(&redis_resp) + .into_report() + .change_context(errors::StorageError::KVError) + }) + // Check for database presence as well Maybe use a read replica here ? + } + } } async fn find_payment_attempt_by_transaction_id_payment_id_merchant_id( @@ -435,54 +489,92 @@ mod storage { transaction_id: &str, payment_id: &str, merchant_id: &str, + storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult { // We assume that PaymentAttempt <=> PaymentIntent is a one-to-one relation for now - self.find_payment_attempt_by_payment_id_merchant_id(payment_id, merchant_id) - .await - .and_then(|attempt| { - if attempt.connector_transaction_id.as_deref() == Some(transaction_id) { - Ok(attempt) - } else { - Err(errors::StorageError::ValueNotFound(format!( - "Successful payment attempt does not exist for {}_{}", - payment_id, merchant_id - ))) - .into_report() - } - }) + self.find_payment_attempt_by_payment_id_merchant_id( + payment_id, + merchant_id, + storage_scheme, + ) + .await + .and_then(|attempt| { + if attempt.connector_transaction_id.as_deref() == Some(transaction_id) { + Ok(attempt) + } else { + Err(errors::StorageError::ValueNotFound(format!( + "Successful payment attempt does not exist for {}_{}", + payment_id, merchant_id + ))) + .into_report() + } + }) } async fn find_payment_attempt_last_successful_attempt_by_payment_id_merchant_id( &self, payment_id: &str, merchant_id: &str, + storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult { - self.find_payment_attempt_by_payment_id_merchant_id(payment_id, merchant_id) - .await - .and_then(|attempt| match attempt.status { - enums::AttemptStatus::Charged => Ok(attempt), - _ => Err(errors::StorageError::ValueNotFound(format!( - "Successful payment attempt does not exist for {}_{}", - payment_id, merchant_id - ))) - .into_report(), - }) + self.find_payment_attempt_by_payment_id_merchant_id( + payment_id, + merchant_id, + storage_scheme, + ) + .await + .and_then(|attempt| match attempt.status { + enums::AttemptStatus::Charged => Ok(attempt), + _ => Err(errors::StorageError::ValueNotFound(format!( + "Successful payment attempt does not exist for {}_{}", + payment_id, merchant_id + ))) + .into_report(), + }) } async fn find_payment_attempt_by_merchant_id_connector_txn_id( &self, - _merchant_id: &str, - _connector_txn_id: &str, + merchant_id: &str, + connector_txn_id: &str, + storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult { - Err(errors::StorageError::KVError).into_report() + match storage_scheme { + enums::MerchantStorageScheme::PostgresOnly => { + let conn = pg_connection(&self.master_pool).await; + // TODO: update logic to lookup all payment attempts for an intent + // and apply filter logic on top of them to get the desired one. + PaymentAttempt::find_by_merchant_id_connector_txn_id( + &conn, + merchant_id, + connector_txn_id, + ) + .await + } + + enums::MerchantStorageScheme::RedisKv => { + Err(errors::StorageError::KVError).into_report() + } + } } async fn find_payment_attempt_by_merchant_id_txn_id( &self, - _merchant_id: &str, - _txn_id: &str, + merchant_id: &str, + txn_id: &str, + storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult { - Err(errors::StorageError::KVError).into_report() + match storage_scheme { + enums::MerchantStorageScheme::PostgresOnly => { + let conn = pg_connection(&self.master_pool).await; + PaymentAttempt::find_by_merchant_id_transaction_id(&conn, merchant_id, txn_id) + .await + } + + enums::MerchantStorageScheme::RedisKv => { + Err(errors::StorageError::KVError).into_report() + } + } } } } diff --git a/crates/router/src/db/payment_intent.rs b/crates/router/src/db/payment_intent.rs index bbb2c4f776..71ea778b26 100644 --- a/crates/router/src/db/payment_intent.rs +++ b/crates/router/src/db/payment_intent.rs @@ -3,7 +3,7 @@ use crate::{ core::errors::{self, CustomResult}, types::{ api, - storage::{PaymentIntent, PaymentIntentNew, PaymentIntentUpdate}, + storage::{enums, PaymentIntent, PaymentIntentNew, PaymentIntentUpdate}, }, }; @@ -13,23 +13,27 @@ pub trait PaymentIntentInterface { &self, this: PaymentIntent, payment_intent: PaymentIntentUpdate, + storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult; async fn insert_payment_intent( &self, new: PaymentIntentNew, + storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult; async fn find_payment_intent_by_payment_id_merchant_id( &self, payment_id: &str, merchant_id: &str, + storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult; async fn filter_payment_intent_by_constraints( &self, merchant_id: &str, pc: &api::PaymentListConstraints, + storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult, errors::StorageError>; } @@ -45,7 +49,10 @@ mod storage { connection::pg_connection, core::errors::{self, CustomResult}, services::Store, - types::{api, storage::payment_intent::*}, + types::{ + api, + storage::{enums, payment_intent::*}, + }, utils::storage_partitioning::KvStorePartition, }; @@ -54,56 +61,129 @@ mod storage { async fn insert_payment_intent( &self, new: PaymentIntentNew, + storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult { - let key = format!("{}_{}", new.payment_id, new.merchant_id); - let created_intent = PaymentIntent { - id: 0i32, - payment_id: new.payment_id.clone(), - merchant_id: new.merchant_id.clone(), - status: new.status, - amount: new.amount, - currency: new.currency, - amount_captured: new.amount_captured, - customer_id: new.customer_id.clone(), - description: new.description.clone(), - return_url: new.return_url.clone(), - metadata: new.metadata.clone(), - connector_id: new.connector_id.clone(), - shipping_address_id: new.shipping_address_id.clone(), - billing_address_id: new.billing_address_id.clone(), - statement_descriptor_name: new.statement_descriptor_name.clone(), - statement_descriptor_suffix: new.statement_descriptor_suffix.clone(), - created_at: new.created_at.unwrap_or_else(date_time::now), - modified_at: new.created_at.unwrap_or_else(date_time::now), - last_synced: new.last_synced, - setup_future_usage: new.setup_future_usage, - off_session: new.off_session, - client_secret: new.client_secret.clone(), - }; - // TODO: Add a proper error for serialization failure - let redis_value = serde_json::to_string(&created_intent) - .into_report() - .change_context(errors::StorageError::KVError)?; - match self - .redis_conn - .pool - .hsetnx::(&key, "pi", &redis_value) - .await - { - Ok(0) => Err(errors::StorageError::DuplicateValue(format!( - "Payment Intent already exists for payment_id: {key}" - ))) - .into_report(), - Ok(1) => { + match storage_scheme { + enums::MerchantStorageScheme::PostgresOnly => { let conn = pg_connection(&self.master_pool).await; - let query = new - .insert_diesel(&conn) + new.insert_diesel(&conn).await + } + + enums::MerchantStorageScheme::RedisKv => { + let key = format!("{}_{}", new.payment_id, new.merchant_id); + let created_intent = PaymentIntent { + id: 0i32, + payment_id: new.payment_id.clone(), + merchant_id: new.merchant_id.clone(), + status: new.status, + amount: new.amount, + currency: new.currency, + amount_captured: new.amount_captured, + customer_id: new.customer_id.clone(), + description: new.description.clone(), + return_url: new.return_url.clone(), + metadata: new.metadata.clone(), + connector_id: new.connector_id.clone(), + shipping_address_id: new.shipping_address_id.clone(), + billing_address_id: new.billing_address_id.clone(), + statement_descriptor_name: new.statement_descriptor_name.clone(), + statement_descriptor_suffix: new.statement_descriptor_suffix.clone(), + created_at: new.created_at.unwrap_or_else(date_time::now), + modified_at: new.created_at.unwrap_or_else(date_time::now), + last_synced: new.last_synced, + setup_future_usage: new.setup_future_usage, + off_session: new.off_session, + client_secret: new.client_secret.clone(), + }; + // TODO: Add a proper error for serialization failure + let redis_value = serde_json::to_string(&created_intent) + .into_report() + .change_context(errors::StorageError::KVError)?; + match self + .redis_conn + .pool + .hsetnx::(&key, "pi", &redis_value) + .await + { + Ok(0) => Err(errors::StorageError::DuplicateValue(format!( + "Payment Intent already exists for payment_id: {key}" + ))) + .into_report(), + Ok(1) => { + let conn = pg_connection(&self.master_pool).await; + let query = new + .insert_diesel_query(&conn) + .await + .change_context(errors::StorageError::KVError)?; + let stream_name = self.drainer_stream(&PaymentIntent::shard_key( + crate::utils::storage_partitioning::PartitionKey::MerchantIdPaymentId { + merchant_id: &created_intent.merchant_id, + payment_id: &created_intent.payment_id, + }, + self.config.drainer_num_partitions, + )); + self.redis_conn + .stream_append_entry( + &stream_name, + &RedisEntryId::AutoGeneratedID, + query.to_field_value_pairs(), + ) + .await + .change_context(errors::StorageError::KVError)?; + Ok(created_intent) + } + Ok(i) => Err(errors::StorageError::KVError) + .into_report() + .attach_printable_lazy(|| { + format!("Invalid response for HSETNX: {}", i) + }), + Err(er) => Err(er) + .into_report() + .change_context(errors::StorageError::KVError), + } + } + } + } + + async fn update_payment_intent( + &self, + this: PaymentIntent, + payment_intent: PaymentIntentUpdate, + storage_scheme: enums::MerchantStorageScheme, + ) -> CustomResult { + match storage_scheme { + enums::MerchantStorageScheme::PostgresOnly => { + let conn = pg_connection(&self.master_pool).await; + this.update(&conn, payment_intent).await + } + + enums::MerchantStorageScheme::RedisKv => { + let key = format!("{}_{}", this.payment_id, this.merchant_id); + + let updated_intent = payment_intent.clone().apply_changeset(this.clone()); + // Check for database presence as well Maybe use a read replica here ? + // TODO: Add a proper error for serialization failure + let redis_value = serde_json::to_string(&updated_intent) + .into_report() + .change_context(errors::StorageError::KVError)?; + let updated_intent = self + .redis_conn + .pool + .hset::(&key, ("pi", redis_value)) + .await + .map(|_| updated_intent) + .into_report() + .change_context(errors::StorageError::KVError)?; + + let conn = pg_connection(&self.master_pool).await; + let query = this + .update_query(&conn, payment_intent) .await .change_context(errors::StorageError::KVError)?; let stream_name = self.drainer_stream(&PaymentIntent::shard_key( crate::utils::storage_partitioning::PartitionKey::MerchantIdPaymentId { - merchant_id: &created_intent.merchant_id, - payment_id: &created_intent.payment_id, + merchant_id: &updated_intent.merchant_id, + payment_id: &updated_intent.payment_id, }, self.config.drainer_num_partitions, )); @@ -115,94 +195,64 @@ mod storage { ) .await .change_context(errors::StorageError::KVError)?; - Ok(created_intent) + Ok(updated_intent) } - Ok(i) => Err(errors::StorageError::KVError) - .into_report() - .attach_printable_lazy(|| format!("Invalid response for HSETNX: {}", i)), - Err(er) => Err(er) - .into_report() - .change_context(errors::StorageError::KVError), } } - async fn update_payment_intent( - &self, - this: PaymentIntent, - payment_intent: PaymentIntentUpdate, - ) -> CustomResult { - let key = format!("{}_{}", this.payment_id, this.merchant_id); - - let updated_intent = payment_intent.clone().apply_changeset(this.clone()); - // Check for database presence as well Maybe use a read replica here ? - // TODO: Add a proper error for serialization failure - let redis_value = serde_json::to_string(&updated_intent) - .into_report() - .change_context(errors::StorageError::KVError)?; - let updated_intent = self - .redis_conn - .pool - .hset::(&key, ("pi", redis_value)) - .await - .map(|_| updated_intent) - .into_report() - .change_context(errors::StorageError::KVError)?; - - let conn = pg_connection(&self.master_pool).await; - let query = this - .update(&conn, payment_intent) - .await - .change_context(errors::StorageError::KVError)?; - let stream_name = self.drainer_stream(&PaymentIntent::shard_key( - crate::utils::storage_partitioning::PartitionKey::MerchantIdPaymentId { - merchant_id: &updated_intent.merchant_id, - payment_id: &updated_intent.payment_id, - }, - self.config.drainer_num_partitions, - )); - self.redis_conn - .stream_append_entry( - &stream_name, - &RedisEntryId::AutoGeneratedID, - query.to_field_value_pairs(), - ) - .await - .change_context(errors::StorageError::KVError)?; - Ok(updated_intent) - } - async fn find_payment_intent_by_payment_id_merchant_id( &self, payment_id: &str, merchant_id: &str, + storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult { - let key = format!("{}_{}", payment_id, merchant_id); - self.redis_conn - .pool - .hget::(&key, "pi") - .await - .map_err(|err| match err.kind() { - RedisErrorKind::NotFound => errors::StorageError::ValueNotFound(format!( - "Payment Intent does not exist for {}", - key - )), - _ => errors::StorageError::KVError, - }) - .into_report() - .and_then(|redis_resp| { - serde_json::from_str::(&redis_resp) + match storage_scheme { + enums::MerchantStorageScheme::PostgresOnly => { + let conn = pg_connection(&self.master_pool).await; + PaymentIntent::find_by_payment_id_merchant_id(&conn, payment_id, merchant_id) + .await + } + + enums::MerchantStorageScheme::RedisKv => { + let key = format!("{}_{}", payment_id, merchant_id); + self.redis_conn + .pool + .hget::(&key, "pi") + .await + .map_err(|err| match err.kind() { + RedisErrorKind::NotFound => errors::StorageError::ValueNotFound( + format!("Payment Intent does not exist for {}", key), + ), + _ => errors::StorageError::KVError, + }) .into_report() - .change_context(errors::StorageError::KVError) - }) - // Check for database presence as well Maybe use a read replica here ? + .and_then(|redis_resp| { + serde_json::from_str::(&redis_resp) + .into_report() + .change_context(errors::StorageError::KVError) + }) + // Check for database presence as well Maybe use a read replica here ? + } + } } + async fn filter_payment_intent_by_constraints( &self, - _merchant_id: &str, - _pc: &api::PaymentListConstraints, + merchant_id: &str, + pc: &api::PaymentListConstraints, + storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult, errors::StorageError> { - //TODO: Implement this - Err(errors::StorageError::KVError.into()) + match storage_scheme { + enums::MerchantStorageScheme::PostgresOnly => { + let conn = pg_connection(&self.master_pool).await; + PaymentIntent::filter_by_constraints(&conn, merchant_id, pc).await + } + + enums::MerchantStorageScheme::RedisKv => { + //TODO: Implement this + Err(errors::StorageError::KVError.into()) + } + } } } } @@ -214,7 +264,10 @@ mod storage { connection::pg_connection, core::errors::{self, CustomResult}, services::Store, - types::{api, storage::payment_intent::*}, + types::{ + api, + storage::{enums, payment_intent::*}, + }, }; #[async_trait::async_trait] @@ -222,6 +275,7 @@ mod storage { async fn insert_payment_intent( &self, new: PaymentIntentNew, + _storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult { let conn = pg_connection(&self.master_pool).await; new.insert_diesel(&conn).await @@ -231,6 +285,7 @@ mod storage { &self, this: PaymentIntent, payment_intent: PaymentIntentUpdate, + _storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult { let conn = pg_connection(&self.master_pool).await; this.update(&conn, payment_intent).await @@ -240,6 +295,7 @@ mod storage { &self, payment_id: &str, merchant_id: &str, + _storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult { let conn = pg_connection(&self.master_pool).await; PaymentIntent::find_by_payment_id_merchant_id(&conn, payment_id, merchant_id).await @@ -249,6 +305,7 @@ mod storage { &self, merchant_id: &str, pc: &api::PaymentListConstraints, + _storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult, errors::StorageError> { let conn = pg_connection(&self.master_pool).await; PaymentIntent::filter_by_constraints(&conn, merchant_id, pc).await @@ -262,6 +319,7 @@ impl PaymentIntentInterface for MockDb { &self, _merchant_id: &str, _pc: &api::PaymentListConstraints, + _storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult, errors::StorageError> { todo!() } @@ -270,6 +328,7 @@ impl PaymentIntentInterface for MockDb { async fn insert_payment_intent( &self, new: PaymentIntentNew, + _storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult { let mut payment_intents = self.payment_intents.lock().await; let time = common_utils::date_time::now(); @@ -305,6 +364,7 @@ impl PaymentIntentInterface for MockDb { &self, this: PaymentIntent, update: PaymentIntentUpdate, + _storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult { let mut payment_intents = self.payment_intents.lock().await; let payment_intent = payment_intents @@ -319,6 +379,7 @@ impl PaymentIntentInterface for MockDb { &self, payment_id: &str, merchant_id: &str, + _storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult { let payment_intents = self.payment_intents.lock().await; diff --git a/crates/router/src/db/refund.rs b/crates/router/src/db/refund.rs index 9b03b5a3ff..d1659caf33 100644 --- a/crates/router/src/db/refund.rs +++ b/crates/router/src/db/refund.rs @@ -4,7 +4,7 @@ use super::MockDb; use crate::{ connection::pg_connection, core::errors::{self, CustomResult, DatabaseError, StorageError}, - types::storage::{Refund, RefundNew, RefundUpdate}, + types::storage::{enums, Refund, RefundNew, RefundUpdate}, }; #[async_trait::async_trait] @@ -13,12 +13,14 @@ pub trait RefundInterface { &self, internal_reference_id: &str, merchant_id: &str, + storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult; async fn find_refund_by_payment_id_merchant_id( &self, payment_id: &str, merchant_id: &str, + storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult, errors::StorageError>; // async fn find_refund_by_payment_id_merchant_id_refund_id( @@ -32,21 +34,28 @@ pub trait RefundInterface { &self, merchant_id: &str, refund_id: &str, + storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult; async fn update_refund( &self, this: Refund, refund: RefundUpdate, + storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult; async fn find_refund_by_merchant_id_transaction_id( &self, merchant_id: &str, txn_id: &str, + storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult, errors::StorageError>; - async fn insert_refund(&self, new: RefundNew) -> CustomResult; + async fn insert_refund( + &self, + new: RefundNew, + storage_scheme: enums::MerchantStorageScheme, + ) -> CustomResult; } #[async_trait::async_trait] @@ -55,13 +64,18 @@ impl RefundInterface for super::Store { &self, internal_reference_id: &str, merchant_id: &str, + _storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult { let conn = pg_connection(&self.master_pool).await; Refund::find_by_internal_reference_id_merchant_id(&conn, internal_reference_id, merchant_id) .await } - async fn insert_refund(&self, new: RefundNew) -> CustomResult { + async fn insert_refund( + &self, + new: RefundNew, + _storage_scheme: enums::MerchantStorageScheme, + ) -> CustomResult { let conn = pg_connection(&self.master_pool).await; new.insert(&conn).await } @@ -69,6 +83,7 @@ impl RefundInterface for super::Store { &self, merchant_id: &str, txn_id: &str, + _storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult, errors::StorageError> { let conn = pg_connection(&self.master_pool).await; Refund::find_by_merchant_id_transaction_id(&conn, merchant_id, txn_id).await @@ -78,6 +93,7 @@ impl RefundInterface for super::Store { &self, this: Refund, refund: RefundUpdate, + _storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult { let conn = pg_connection(&self.master_pool).await; this.update(&conn, refund).await @@ -87,6 +103,7 @@ impl RefundInterface for super::Store { &self, merchant_id: &str, refund_id: &str, + _storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult { let conn = pg_connection(&self.master_pool).await; Refund::find_by_merchant_id_refund_id(&conn, merchant_id, refund_id).await @@ -107,6 +124,7 @@ impl RefundInterface for super::Store { &self, payment_id: &str, merchant_id: &str, + _storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult, errors::StorageError> { let conn = pg_connection(&self.master_pool).await; Refund::find_by_payment_id_merchant_id(&conn, payment_id, merchant_id).await @@ -119,11 +137,16 @@ impl RefundInterface for MockDb { &self, _internal_reference_id: &str, _merchant_id: &str, + _storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult { todo!() } - async fn insert_refund(&self, new: RefundNew) -> CustomResult { + async fn insert_refund( + &self, + new: RefundNew, + _storage_scheme: enums::MerchantStorageScheme, + ) -> CustomResult { let mut refunds = self.refunds.lock().await; let current_time = common_utils::date_time::now(); @@ -157,6 +180,7 @@ impl RefundInterface for MockDb { &self, merchant_id: &str, txn_id: &str, + _storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult, errors::StorageError> { let refunds = self.refunds.lock().await; @@ -173,6 +197,7 @@ impl RefundInterface for MockDb { &self, _this: Refund, _refund: RefundUpdate, + _storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult { todo!() } @@ -181,6 +206,7 @@ impl RefundInterface for MockDb { &self, merchant_id: &str, refund_id: &str, + _storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult { let refunds = self.refunds.lock().await; @@ -195,6 +221,7 @@ impl RefundInterface for MockDb { &self, _payment_id: &str, _merchant_id: &str, + _storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult, errors::StorageError> { todo!() } diff --git a/crates/router/src/lib.rs b/crates/router/src/lib.rs index 1cfd8cc072..c6553349f9 100644 --- a/crates/router/src/lib.rs +++ b/crates/router/src/lib.rs @@ -121,7 +121,6 @@ pub fn mk_app( /// Unwrap used because without the value we can't start the server pub async fn start_server(conf: Settings) -> BachResult<(Server, AppState)> { logger::debug!(startup_config=?conf); - let server = conf.server.clone(); let state = routes::AppState::new(conf).await; // Cloning to close connections before shutdown diff --git a/crates/router/src/schema.rs b/crates/router/src/schema.rs index 7dd78457a8..9808bb05de 100644 --- a/crates/router/src/schema.rs +++ b/crates/router/src/schema.rs @@ -150,6 +150,7 @@ diesel::table! { sub_merchants_enabled -> Nullable, parent_merchant_id -> Nullable, publishable_key -> Nullable, + storage_scheme -> MerchantStorageScheme, } } diff --git a/crates/router/src/services/api.rs b/crates/router/src/services/api.rs index 044cea4f3d..d94bd3bacf 100644 --- a/crates/router/src/services/api.rs +++ b/crates/router/src/services/api.rs @@ -26,7 +26,11 @@ use crate::{ db::StorageInterface, logger, routes, routes::AppState, - types::{self, api, storage, ErrorResponse, Response}, + types::{ + self, api, + storage::{self, enums}, + ErrorResponse, Response, + }, utils::OptionExt, }; @@ -556,6 +560,7 @@ pub async fn authenticate_merchant<'a>( payment_response_hash_key: None, redirect_to_merchant_with_http_post: false, publishable_key: None, + storage_scheme: enums::MerchantStorageScheme::PostgresOnly, }) } diff --git a/crates/router/src/types/storage/enums.rs b/crates/router/src/types/storage/enums.rs index 0cb2224ade..a44577fe06 100644 --- a/crates/router/src/types/storage/enums.rs +++ b/crates/router/src/types/storage/enums.rs @@ -6,7 +6,8 @@ pub mod diesel_exports { DbEventClass as EventClass, DbEventObjectType as EventObjectType, DbEventType as EventType, DbFutureUsage as FutureUsage, DbIntentStatus as IntentStatus, DbMandateStatus as MandateStatus, DbMandateType as MandateType, - DbPaymentFlow as PaymentFlow, DbPaymentMethodIssuerCode as PaymentMethodIssuerCode, + DbMerchantStorageScheme as MerchantStorageScheme, DbPaymentFlow as PaymentFlow, + DbPaymentMethodIssuerCode as PaymentMethodIssuerCode, DbPaymentMethodSubType as PaymentMethodSubType, DbPaymentMethodType as PaymentMethodType, DbProcessTrackerStatus as ProcessTrackerStatus, DbRefundStatus as RefundStatus, DbRefundType as RefundType, DbRoutingAlgorithm as RoutingAlgorithm, @@ -362,6 +363,28 @@ pub enum FutureUsage { OnSession, } +#[derive( + Clone, + Copy, + Debug, + Default, + Eq, + PartialEq, + serde::Deserialize, + serde::Serialize, + strum::Display, + strum::EnumString, + router_derive::DieselEnum, +)] +#[router_derive::diesel_enum] +#[serde(rename_all = "snake_case")] +#[strum(serialize_all = "snake_case")] +pub enum MerchantStorageScheme { + #[default] + PostgresOnly, + RedisKv, +} + #[derive( Clone, Copy, diff --git a/crates/router/src/types/storage/merchant_account.rs b/crates/router/src/types/storage/merchant_account.rs index 7954dfac12..b9dd43aa44 100644 --- a/crates/router/src/types/storage/merchant_account.rs +++ b/crates/router/src/types/storage/merchant_account.rs @@ -20,6 +20,7 @@ pub struct MerchantAccount { pub sub_merchants_enabled: Option, pub parent_merchant_id: Option, pub publishable_key: Option, + pub storage_scheme: enums::MerchantStorageScheme, } #[derive(Clone, Debug, Default, Insertable, router_derive::DebugAsDisplay)] diff --git a/crates/router/src/types/storage/payment_attempt.rs b/crates/router/src/types/storage/payment_attempt.rs index b2bb0bc559..3694e40aa6 100644 --- a/crates/router/src/types/storage/payment_attempt.rs +++ b/crates/router/src/types/storage/payment_attempt.rs @@ -238,7 +238,12 @@ mod tests { use uuid::Uuid; use super::*; - use crate::{configs::settings::Settings, db::StorageImpl, routes, types}; + use crate::{ + configs::settings::Settings, + db::StorageImpl, + routes, + types::{self, storage::enums}, + }; #[actix_rt::test] #[ignore] @@ -259,7 +264,7 @@ mod tests { let response = state .store - .insert_payment_attempt(payment_attempt) + .insert_payment_attempt(payment_attempt, enums::MerchantStorageScheme::PostgresOnly) .await .unwrap(); eprintln!("{:?}", response); @@ -289,13 +294,17 @@ mod tests { }; state .store - .insert_payment_attempt(payment_attempt) + .insert_payment_attempt(payment_attempt, enums::MerchantStorageScheme::PostgresOnly) .await .unwrap(); let response = state .store - .find_payment_attempt_by_payment_id_merchant_id(&payment_id, &merchant_id) + .find_payment_attempt_by_payment_id_merchant_id( + &payment_id, + &merchant_id, + enums::MerchantStorageScheme::PostgresOnly, + ) .await .unwrap(); @@ -326,13 +335,17 @@ mod tests { }; state .store - .insert_payment_attempt(payment_attempt) + .insert_payment_attempt(payment_attempt, enums::MerchantStorageScheme::PostgresOnly) .await .unwrap(); let response = state .store - .find_payment_attempt_by_payment_id_merchant_id(&uuid, "1") + .find_payment_attempt_by_payment_id_merchant_id( + &uuid, + "1", + enums::MerchantStorageScheme::PostgresOnly, + ) .await .unwrap(); // checking it after fetch diff --git a/crates/router/src/types/storage/query/payment_attempt.rs b/crates/router/src/types/storage/query/payment_attempt.rs index 00c018d865..e7d17d0b31 100644 --- a/crates/router/src/types/storage/query/payment_attempt.rs +++ b/crates/router/src/types/storage/query/payment_attempt.rs @@ -8,7 +8,7 @@ use router_env::tracing::{self, instrument}; #[cfg(not(feature = "kv_store"))] use super::generics::{self, ExecuteQuery}; #[cfg(feature = "kv_store")] -use super::generics::{self, RawQuery, RawSqlQuery}; +use super::generics::{self, ExecuteQuery, RawQuery, RawSqlQuery}; use crate::{ connection::PgPooledConn, core::errors::{self, CustomResult}, @@ -21,7 +21,6 @@ use crate::{ }; impl PaymentAttemptNew { - #[cfg(not(feature = "kv_store"))] #[instrument(skip(conn))] pub async fn insert_diesel( self, @@ -32,7 +31,7 @@ impl PaymentAttemptNew { #[cfg(feature = "kv_store")] #[instrument(skip(conn))] - pub async fn insert_diesel( + pub async fn insert_diesel_query( self, conn: &PgPooledConn, ) -> CustomResult { @@ -41,7 +40,6 @@ impl PaymentAttemptNew { } impl PaymentAttempt { - #[cfg(not(feature = "kv_store"))] #[instrument(skip(conn))] pub async fn update( self, @@ -68,7 +66,7 @@ impl PaymentAttempt { #[cfg(feature = "kv_store")] #[instrument(skip(conn))] - pub async fn update( + pub async fn update_query( self, conn: &PgPooledConn, payment_attempt: PaymentAttemptUpdate, diff --git a/crates/router/src/types/storage/query/payment_intent.rs b/crates/router/src/types/storage/query/payment_intent.rs index 2ca31e5f3c..c6e17a0b11 100644 --- a/crates/router/src/types/storage/query/payment_intent.rs +++ b/crates/router/src/types/storage/query/payment_intent.rs @@ -6,7 +6,7 @@ use router_env::tracing::{self, instrument}; #[cfg(not(feature = "kv_store"))] use super::generics::{self, ExecuteQuery}; #[cfg(feature = "kv_store")] -use super::generics::{self, RawQuery, RawSqlQuery}; +use super::generics::{self, ExecuteQuery, RawQuery, RawSqlQuery}; use crate::{ connection::PgPooledConn, core::errors::{self, CustomResult}, @@ -20,7 +20,6 @@ use crate::{ }; impl PaymentIntentNew { - #[cfg(not(feature = "kv_store"))] #[instrument(skip(conn))] pub async fn insert_diesel( self, @@ -31,7 +30,7 @@ impl PaymentIntentNew { #[cfg(feature = "kv_store")] #[instrument(skip(conn))] - pub async fn insert_diesel( + pub async fn insert_diesel_query( self, conn: &PgPooledConn, ) -> CustomResult { @@ -40,7 +39,6 @@ impl PaymentIntentNew { } impl PaymentIntent { - #[cfg(not(feature = "kv_store"))] #[instrument(skip(conn))] pub async fn update( self, @@ -67,7 +65,7 @@ impl PaymentIntent { #[cfg(feature = "kv_store")] #[instrument(skip(conn))] - pub async fn update( + pub async fn update_query( self, conn: &PgPooledConn, payment_intent: PaymentIntentUpdate, diff --git a/migrations/2022-12-07-055441_add_use_kv_to_merchant_account/down.sql b/migrations/2022-12-07-055441_add_use_kv_to_merchant_account/down.sql new file mode 100644 index 0000000000..14c5c102ec --- /dev/null +++ b/migrations/2022-12-07-055441_add_use_kv_to_merchant_account/down.sql @@ -0,0 +1,5 @@ +-- This file should undo anything in `up.sql` + +ALTER TABLE merchant_account DROP COLUMN storage_scheme; + +DROP TYPE "MerchantStorageScheme"; diff --git a/migrations/2022-12-07-055441_add_use_kv_to_merchant_account/up.sql b/migrations/2022-12-07-055441_add_use_kv_to_merchant_account/up.sql new file mode 100644 index 0000000000..356d8858e2 --- /dev/null +++ b/migrations/2022-12-07-055441_add_use_kv_to_merchant_account/up.sql @@ -0,0 +1,8 @@ +-- Your SQL goes here + +CREATE TYPE "MerchantStorageScheme" AS ENUM ( + 'postgres_only', + 'redis_kv' +); + +ALTER TABLE merchant_account ADD COLUMN storage_scheme "MerchantStorageScheme" NOT NULL DEFAULT 'postgres_only';