diff --git a/crates/router/src/connection.rs b/crates/router/src/connection.rs index f8775682f5..f692d44691 100644 --- a/crates/router/src/connection.rs +++ b/crates/router/src/connection.rs @@ -56,12 +56,42 @@ pub async fn diesel_make_pg_pool(database: &Database, test_transaction: bool) -> .expect("Failed to create PostgreSQL connection pool") } -pub async fn pg_connection( - pool: &PgPool, +pub async fn pg_connection_read( + store: &crate::services::Store, ) -> errors::CustomResult< PooledConnection<'_, async_bb8_diesel::ConnectionManager>, errors::StorageError, > { + // If only OLAP is enabled get replica pool. + #[cfg(all(feature = "olap", not(feature = "oltp")))] + let pool = &store.replica_pool; + + // If either one of these are true we need to get master pool. + // 1. Only OLTP is enabled. + // 2. Both OLAP and OLTP is enabled. + // 3. Both OLAP and OLTP is disabled. + #[cfg(any( + all(not(feature = "olap"), feature = "oltp"), + all(feature = "olap", feature = "oltp"), + all(not(feature = "olap"), not(feature = "oltp")) + ))] + let pool = &store.master_pool; + + pool.get() + .await + .into_report() + .change_context(errors::StorageError::DatabaseConnectionError) +} + +pub async fn pg_connection_write( + store: &crate::services::Store, +) -> errors::CustomResult< + PooledConnection<'_, async_bb8_diesel::ConnectionManager>, + errors::StorageError, +> { + // Since all writes should happen to master DB only choose master DB. + let pool = &store.master_pool; + pool.get() .await .into_report() diff --git a/crates/router/src/db/address.rs b/crates/router/src/db/address.rs index d49fc96c0c..d0049d8b4b 100644 --- a/crates/router/src/db/address.rs +++ b/crates/router/src/db/address.rs @@ -2,7 +2,7 @@ use error_stack::IntoReport; use super::{MockDb, Store}; use crate::{ - connection::pg_connection, + connection, core::errors::{self, CustomResult}, types::storage, }; @@ -39,7 +39,7 @@ impl AddressInterface for Store { &self, address_id: &str, ) -> CustomResult { - let conn = pg_connection(&self.master_pool).await?; + let conn = connection::pg_connection_read(self).await?; storage::Address::find_by_address_id(&conn, address_id) .await .map_err(Into::into) @@ -51,7 +51,7 @@ impl AddressInterface for Store { address_id: String, address: storage::AddressUpdate, ) -> CustomResult { - let conn = pg_connection(&self.master_pool).await?; + let conn = connection::pg_connection_write(self).await?; storage::Address::update_by_address_id(&conn, address_id, address) .await .map_err(Into::into) @@ -62,7 +62,7 @@ impl AddressInterface for Store { &self, address: storage::AddressNew, ) -> CustomResult { - let conn = pg_connection(&self.master_pool).await?; + let conn = connection::pg_connection_write(self).await?; address .insert(&conn) .await @@ -76,7 +76,7 @@ impl AddressInterface for Store { merchant_id: &str, address: storage::AddressUpdate, ) -> CustomResult, errors::StorageError> { - let conn = pg_connection(&self.master_pool).await?; + let conn = connection::pg_connection_write(self).await?; storage::Address::update_by_merchant_id_customer_id( &conn, customer_id, diff --git a/crates/router/src/db/api_keys.rs b/crates/router/src/db/api_keys.rs index 3fc1730285..53901974c4 100644 --- a/crates/router/src/db/api_keys.rs +++ b/crates/router/src/db/api_keys.rs @@ -2,7 +2,7 @@ use error_stack::IntoReport; use super::{MockDb, Store}; use crate::{ - connection::pg_connection, + connection, core::errors::{self, CustomResult}, types::storage, }; @@ -46,7 +46,7 @@ impl ApiKeyInterface for Store { &self, api_key: storage::ApiKeyNew, ) -> CustomResult { - let conn = pg_connection(&self.master_pool).await?; + let conn = connection::pg_connection_write(self).await?; api_key .insert(&conn) .await @@ -59,7 +59,7 @@ impl ApiKeyInterface for Store { key_id: String, api_key: storage::ApiKeyUpdate, ) -> CustomResult { - let conn = pg_connection(&self.master_pool).await?; + let conn = connection::pg_connection_write(self).await?; storage::ApiKey::update_by_key_id(&conn, key_id, api_key) .await .map_err(Into::into) @@ -67,7 +67,7 @@ impl ApiKeyInterface for Store { } async fn revoke_api_key(&self, key_id: &str) -> CustomResult { - let conn = pg_connection(&self.master_pool).await?; + let conn = connection::pg_connection_write(self).await?; storage::ApiKey::revoke_by_key_id(&conn, key_id) .await .map_err(Into::into) @@ -78,7 +78,7 @@ impl ApiKeyInterface for Store { &self, key_id: &str, ) -> CustomResult, errors::StorageError> { - let conn = pg_connection(&self.master_pool).await?; + let conn = connection::pg_connection_read(self).await?; storage::ApiKey::find_optional_by_key_id(&conn, key_id) .await .map_err(Into::into) @@ -89,7 +89,7 @@ impl ApiKeyInterface for Store { &self, hashed_api_key: storage::HashedApiKey, ) -> CustomResult, errors::StorageError> { - let conn = pg_connection(&self.master_pool).await?; + let conn = connection::pg_connection_read(self).await?; storage::ApiKey::find_optional_by_hashed_api_key(&conn, hashed_api_key) .await .map_err(Into::into) @@ -102,7 +102,7 @@ impl ApiKeyInterface for Store { limit: Option, offset: Option, ) -> CustomResult, errors::StorageError> { - let conn = pg_connection(&self.master_pool).await?; + let conn = connection::pg_connection_read(self).await?; storage::ApiKey::find_by_merchant_id(&conn, merchant_id, limit, offset) .await .map_err(Into::into) diff --git a/crates/router/src/db/configs.rs b/crates/router/src/db/configs.rs index b55f04b7fc..eae44ed0da 100644 --- a/crates/router/src/db/configs.rs +++ b/crates/router/src/db/configs.rs @@ -2,7 +2,7 @@ use error_stack::IntoReport; use super::{cache, MockDb, Store}; use crate::{ - connection::pg_connection, + connection, core::errors::{self, CustomResult}, types::storage, }; @@ -45,7 +45,7 @@ impl ConfigInterface for Store { &self, config: storage::ConfigNew, ) -> CustomResult { - let conn = pg_connection(&self.master_pool).await?; + let conn = connection::pg_connection_write(self).await?; config.insert(&conn).await.map_err(Into::into).into_report() } @@ -53,7 +53,7 @@ impl ConfigInterface for Store { &self, key: &str, ) -> CustomResult { - let conn = pg_connection(&self.master_pool).await?; + let conn = connection::pg_connection_write(self).await?; storage::Config::find_by_key(&conn, key) .await .map_err(Into::into) @@ -65,7 +65,7 @@ impl ConfigInterface for Store { key: &str, config_update: storage::ConfigUpdate, ) -> CustomResult { - let conn = pg_connection(&self.master_pool).await?; + let conn = connection::pg_connection_write(self).await?; storage::Config::update_by_key(&conn, key, config_update) .await .map_err(Into::into) @@ -91,7 +91,7 @@ impl ConfigInterface for Store { } async fn delete_config_by_key(&self, key: &str) -> CustomResult { - let conn = pg_connection(&self.master_pool).await?; + let conn = connection::pg_connection_write(self).await?; storage::Config::delete_by_key(&conn, key) .await .map_err(Into::into) diff --git a/crates/router/src/db/connector_response.rs b/crates/router/src/db/connector_response.rs index aabca6e455..bf298fa4c0 100644 --- a/crates/router/src/db/connector_response.rs +++ b/crates/router/src/db/connector_response.rs @@ -2,7 +2,7 @@ use error_stack::IntoReport; use super::{MockDb, Store}; use crate::{ - connection::pg_connection, + connection, core::errors::{self, CustomResult}, types::storage::{self, enums}, }; @@ -38,7 +38,7 @@ impl ConnectorResponseInterface for Store { connector_response: storage::ConnectorResponseNew, _storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult { - let conn = pg_connection(&self.master_pool).await?; + let conn = connection::pg_connection_write(self).await?; connector_response .insert(&conn) .await @@ -53,7 +53,7 @@ impl ConnectorResponseInterface for Store { attempt_id: &str, _storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult { - let conn = pg_connection(&self.master_pool).await?; + let conn = connection::pg_connection_read(self).await?; storage::ConnectorResponse::find_by_payment_id_merchant_id_attempt_id( &conn, payment_id, @@ -71,7 +71,7 @@ impl ConnectorResponseInterface for Store { connector_response_update: storage::ConnectorResponseUpdate, _storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult { - let conn = pg_connection(&self.master_pool).await?; + let conn = connection::pg_connection_write(self).await?; this.update(&conn, connector_response_update) .await .map_err(Into::into) diff --git a/crates/router/src/db/customers.rs b/crates/router/src/db/customers.rs index ea453eea2e..f9f7ea96d2 100644 --- a/crates/router/src/db/customers.rs +++ b/crates/router/src/db/customers.rs @@ -2,7 +2,7 @@ use error_stack::IntoReport; use super::{MockDb, Store}; use crate::{ - connection::pg_connection, + connection, core::{ customers::REDACTED, errors::{self, CustomResult}, @@ -50,7 +50,7 @@ impl CustomerInterface for Store { customer_id: &str, merchant_id: &str, ) -> CustomResult, errors::StorageError> { - let conn = pg_connection(&self.master_pool).await?; + let conn = connection::pg_connection_read(self).await?; let maybe_customer = storage::Customer::find_optional_by_customer_id_merchant_id( &conn, customer_id, @@ -75,7 +75,7 @@ impl CustomerInterface for Store { merchant_id: String, customer: storage::CustomerUpdate, ) -> CustomResult { - let conn = pg_connection(&self.master_pool).await?; + let conn = connection::pg_connection_write(self).await?; storage::Customer::update_by_customer_id_merchant_id( &conn, customer_id, @@ -92,7 +92,7 @@ impl CustomerInterface for Store { customer_id: &str, merchant_id: &str, ) -> CustomResult { - let conn = pg_connection(&self.master_pool).await?; + let conn = connection::pg_connection_read(self).await?; let customer = storage::Customer::find_by_customer_id_merchant_id(&conn, customer_id, merchant_id) .await @@ -108,7 +108,7 @@ impl CustomerInterface for Store { &self, customer_data: storage::CustomerNew, ) -> CustomResult { - let conn = pg_connection(&self.master_pool).await?; + let conn = connection::pg_connection_write(self).await?; customer_data .insert(&conn) .await @@ -121,7 +121,7 @@ impl CustomerInterface for Store { customer_id: &str, merchant_id: &str, ) -> CustomResult { - let conn = pg_connection(&self.master_pool).await?; + let conn = connection::pg_connection_write(self).await?; storage::Customer::delete_by_customer_id_merchant_id(&conn, customer_id, merchant_id) .await .map_err(Into::into) diff --git a/crates/router/src/db/events.rs b/crates/router/src/db/events.rs index ca29d0ff62..6b512eb07e 100644 --- a/crates/router/src/db/events.rs +++ b/crates/router/src/db/events.rs @@ -2,7 +2,7 @@ use error_stack::IntoReport; use super::{MockDb, Store}; use crate::{ - connection::pg_connection, + connection, core::errors::{self, CustomResult}, types::storage, }; @@ -21,7 +21,7 @@ impl EventInterface for Store { &self, event: storage::EventNew, ) -> CustomResult { - let conn = pg_connection(&self.master_pool).await?; + let conn = connection::pg_connection_write(self).await?; event.insert(&conn).await.map_err(Into::into).into_report() } } diff --git a/crates/router/src/db/locker_mock_up.rs b/crates/router/src/db/locker_mock_up.rs index 0c6e8cc675..30b1beee45 100644 --- a/crates/router/src/db/locker_mock_up.rs +++ b/crates/router/src/db/locker_mock_up.rs @@ -2,7 +2,7 @@ use error_stack::IntoReport; use super::{MockDb, Store}; use crate::{ - connection::pg_connection, + connection, core::errors::{self, CustomResult}, types::storage, }; @@ -31,7 +31,7 @@ impl LockerMockUpInterface for Store { &self, card_id: &str, ) -> CustomResult { - let conn = pg_connection(&self.master_pool).await?; + let conn = connection::pg_connection_read(self).await?; storage::LockerMockUp::find_by_card_id(&conn, card_id) .await .map_err(Into::into) @@ -42,7 +42,7 @@ impl LockerMockUpInterface for Store { &self, new: storage::LockerMockUpNew, ) -> CustomResult { - let conn = pg_connection(&self.master_pool).await?; + let conn = connection::pg_connection_write(self).await?; new.insert(&conn).await.map_err(Into::into).into_report() } @@ -50,7 +50,7 @@ impl LockerMockUpInterface for Store { &self, card_id: &str, ) -> CustomResult { - let conn = pg_connection(&self.master_pool).await?; + let conn = connection::pg_connection_write(self).await?; storage::LockerMockUp::delete_by_card_id(&conn, card_id) .await .map_err(Into::into) diff --git a/crates/router/src/db/mandate.rs b/crates/router/src/db/mandate.rs index ea68a9e43c..b3cbbbd078 100644 --- a/crates/router/src/db/mandate.rs +++ b/crates/router/src/db/mandate.rs @@ -2,7 +2,7 @@ use error_stack::IntoReport; use super::{MockDb, Store}; use crate::{ - connection::pg_connection, + connection, core::errors::{self, CustomResult}, types::storage, }; @@ -41,7 +41,7 @@ impl MandateInterface for Store { merchant_id: &str, mandate_id: &str, ) -> CustomResult { - let conn = pg_connection(&self.master_pool).await?; + let conn = connection::pg_connection_read(self).await?; storage::Mandate::find_by_merchant_id_mandate_id(&conn, merchant_id, mandate_id) .await .map_err(Into::into) @@ -53,7 +53,7 @@ impl MandateInterface for Store { merchant_id: &str, customer_id: &str, ) -> CustomResult, errors::StorageError> { - let conn = pg_connection(&self.master_pool).await?; + let conn = connection::pg_connection_read(self).await?; storage::Mandate::find_by_merchant_id_customer_id(&conn, merchant_id, customer_id) .await .map_err(Into::into) @@ -66,7 +66,7 @@ impl MandateInterface for Store { mandate_id: &str, mandate: storage::MandateUpdate, ) -> CustomResult { - let conn = pg_connection(&self.master_pool).await?; + let conn = connection::pg_connection_write(self).await?; storage::Mandate::update_by_merchant_id_mandate_id(&conn, merchant_id, mandate_id, mandate) .await .map_err(Into::into) @@ -77,7 +77,7 @@ impl MandateInterface for Store { &self, mandate: storage::MandateNew, ) -> CustomResult { - let conn = pg_connection(&self.master_pool).await?; + let conn = connection::pg_connection_write(self).await?; mandate .insert(&conn) .await diff --git a/crates/router/src/db/merchant_account.rs b/crates/router/src/db/merchant_account.rs index 853e3e72c7..e6625ad764 100644 --- a/crates/router/src/db/merchant_account.rs +++ b/crates/router/src/db/merchant_account.rs @@ -2,7 +2,7 @@ use error_stack::IntoReport; use super::{MockDb, Store}; use crate::{ - connection::pg_connection, + connection, core::errors::{self, CustomResult}, types::storage::{self, enums}, }; @@ -48,7 +48,7 @@ impl MerchantAccountInterface for Store { &self, merchant_account: storage::MerchantAccountNew, ) -> CustomResult { - let conn = pg_connection(&self.master_pool).await?; + let conn = connection::pg_connection_write(self).await?; merchant_account .insert(&conn) .await @@ -61,7 +61,7 @@ impl MerchantAccountInterface for Store { merchant_id: &str, ) -> CustomResult { let fetch_func = || async { - let conn = pg_connection(&self.master_pool).await?; + let conn = connection::pg_connection_read(self).await?; storage::MerchantAccount::find_by_merchant_id(&conn, merchant_id) .await .map_err(Into::into) @@ -86,7 +86,7 @@ impl MerchantAccountInterface for Store { ) -> CustomResult { let _merchant_id = this.merchant_id.clone(); let update_func = || async { - let conn = pg_connection(&self.master_pool).await?; + let conn = connection::pg_connection_write(self).await?; this.update(&conn, merchant_account) .await .map_err(Into::into) @@ -110,7 +110,7 @@ impl MerchantAccountInterface for Store { merchant_account: storage::MerchantAccountUpdate, ) -> CustomResult { let update_func = || async { - let conn = pg_connection(&self.master_pool).await?; + let conn = connection::pg_connection_write(self).await?; storage::MerchantAccount::update_with_specific_fields( &conn, merchant_id, @@ -136,7 +136,7 @@ impl MerchantAccountInterface for Store { &self, publishable_key: &str, ) -> CustomResult { - let conn = pg_connection(&self.master_pool).await?; + let conn = connection::pg_connection_read(self).await?; storage::MerchantAccount::find_by_publishable_key(&conn, publishable_key) .await .map_err(Into::into) @@ -148,7 +148,7 @@ impl MerchantAccountInterface for Store { merchant_id: &str, ) -> CustomResult { let delete_func = || async { - let conn = pg_connection(&self.master_pool).await?; + let conn = connection::pg_connection_write(self).await?; storage::MerchantAccount::delete_by_merchant_id(&conn, merchant_id) .await .map_err(Into::into) diff --git a/crates/router/src/db/merchant_connector_account.rs b/crates/router/src/db/merchant_connector_account.rs index 640a4dd503..2154c056f9 100644 --- a/crates/router/src/db/merchant_connector_account.rs +++ b/crates/router/src/db/merchant_connector_account.rs @@ -4,7 +4,7 @@ use masking::ExposeInterface; use super::{MockDb, Store}; use crate::{ - connection::pg_connection, + connection, core::errors::{self, CustomResult}, services::logger, types::{self, storage}, @@ -141,7 +141,7 @@ impl MerchantConnectorAccountInterface for Store { merchant_id: &str, connector: &str, ) -> CustomResult { - let conn = pg_connection(&self.master_pool).await?; + let conn = connection::pg_connection_read(self).await?; storage::MerchantConnectorAccount::find_by_merchant_id_connector( &conn, merchant_id, @@ -158,7 +158,7 @@ impl MerchantConnectorAccountInterface for Store { merchant_connector_id: &str, ) -> CustomResult { let find_call = || async { - let conn = pg_connection(&self.master_pool).await?; + let conn = connection::pg_connection_read(self).await?; storage::MerchantConnectorAccount::find_by_merchant_id_merchant_connector_id( &conn, merchant_id, @@ -183,7 +183,7 @@ impl MerchantConnectorAccountInterface for Store { &self, t: storage::MerchantConnectorAccountNew, ) -> CustomResult { - let conn = pg_connection(&self.master_pool).await?; + let conn = connection::pg_connection_write(self).await?; t.insert(&conn).await.map_err(Into::into).into_report() } @@ -192,7 +192,7 @@ impl MerchantConnectorAccountInterface for Store { merchant_id: &str, get_disabled: bool, ) -> CustomResult, errors::StorageError> { - let conn = pg_connection(&self.master_pool).await?; + let conn = connection::pg_connection_read(self).await?; storage::MerchantConnectorAccount::find_by_merchant_id(&conn, merchant_id, get_disabled) .await .map_err(Into::into) @@ -206,7 +206,7 @@ impl MerchantConnectorAccountInterface for Store { ) -> CustomResult { let _merchant_connector_id = this.merchant_connector_id.clone(); let update_call = || async { - let conn = pg_connection(&self.master_pool).await?; + let conn = connection::pg_connection_write(self).await?; this.update(&conn, merchant_connector_account) .await .map_err(Into::into) @@ -229,7 +229,7 @@ impl MerchantConnectorAccountInterface for Store { merchant_id: &str, merchant_connector_id: &str, ) -> CustomResult { - let conn = pg_connection(&self.master_pool).await?; + let conn = connection::pg_connection_write(self).await?; storage::MerchantConnectorAccount::delete_by_merchant_id_merchant_connector_id( &conn, merchant_id, diff --git a/crates/router/src/db/payment_attempt.rs b/crates/router/src/db/payment_attempt.rs index 9599dc8f73..a74d751eca 100644 --- a/crates/router/src/db/payment_attempt.rs +++ b/crates/router/src/db/payment_attempt.rs @@ -62,7 +62,7 @@ mod storage { use super::PaymentAttemptInterface; use crate::{ - connection::pg_connection, + connection, core::errors::{self, CustomResult}, services::Store, types::storage::{enums, payment_attempt::*}, @@ -75,7 +75,7 @@ mod storage { payment_attempt: PaymentAttemptNew, _storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult { - let conn = pg_connection(&self.master_pool).await?; + let conn = connection::pg_connection_write(self).await?; payment_attempt .insert(&conn) .await @@ -89,7 +89,7 @@ mod storage { payment_attempt: PaymentAttemptUpdate, _storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult { - let conn = pg_connection(&self.master_pool).await?; + let conn = connection::pg_connection_write(self).await?; this.update(&conn, payment_attempt) .await .map_err(Into::into) @@ -102,7 +102,7 @@ mod storage { merchant_id: &str, _storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult { - let conn = pg_connection(&self.master_pool).await?; + let conn = connection::pg_connection_read(self).await?; PaymentAttempt::find_by_payment_id_merchant_id(&conn, payment_id, merchant_id) .await .map_err(Into::into) @@ -116,7 +116,7 @@ mod storage { merchant_id: &str, _storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult { - let conn = pg_connection(&self.master_pool).await?; + let conn = connection::pg_connection_read(self).await?; PaymentAttempt::find_by_connector_transaction_id_payment_id_merchant_id( &conn, connector_transaction_id, @@ -134,7 +134,7 @@ mod storage { merchant_id: &str, _storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult { - let conn = pg_connection(&self.master_pool).await?; + let conn = connection::pg_connection_read(self).await?; PaymentAttempt::find_last_successful_attempt_by_payment_id_merchant_id( &conn, payment_id, @@ -151,7 +151,7 @@ mod storage { connector_txn_id: &str, _storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult { - let conn = pg_connection(&self.master_pool).await?; + let conn = connection::pg_connection_read(self).await?; PaymentAttempt::find_by_merchant_id_connector_txn_id( &conn, merchant_id, @@ -168,7 +168,7 @@ mod storage { attempt_id: &str, _storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult { - let conn = pg_connection(&self.master_pool).await?; + let conn = connection::pg_connection_read(self).await?; PaymentAttempt::find_by_merchant_id_attempt_id(&conn, merchant_id, attempt_id) .await @@ -320,7 +320,7 @@ mod storage { use super::PaymentAttemptInterface; use crate::{ - connection::pg_connection, + connection, core::errors::{self, CustomResult}, db::reverse_lookup::ReverseLookupInterface, services::Store, @@ -337,7 +337,7 @@ mod storage { ) -> CustomResult { match storage_scheme { enums::MerchantStorageScheme::PostgresOnly => { - let conn = pg_connection(&self.master_pool).await?; + let conn = connection::pg_connection_write(self).await?; payment_attempt .insert(&conn) .await @@ -400,7 +400,7 @@ mod storage { }) .into_report(), Ok(HsetnxReply::KeySet) => { - let conn = pg_connection(&self.master_pool).await?; + let conn = connection::pg_connection_write(self).await?; //Reverse lookup for attempt_id ReverseLookupNew { @@ -448,7 +448,7 @@ mod storage { ) -> CustomResult { match storage_scheme { enums::MerchantStorageScheme::PostgresOnly => { - let conn = pg_connection(&self.master_pool).await?; + let conn = connection::pg_connection_write(self).await?; this.update(&conn, payment_attempt) .await .map_err(Into::into) @@ -472,7 +472,7 @@ mod storage { .map(|_| updated_attempt) .change_context(errors::StorageError::KVError)?; - let conn = pg_connection(&self.master_pool).await?; + let conn = connection::pg_connection_write(self).await?; // Reverse lookup for connector_transaction_id if let (None, Some(connector_transaction_id)) = ( old_connector_transaction_id, @@ -524,7 +524,7 @@ mod storage { storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult { let database_call = || async { - let conn = pg_connection(&self.master_pool).await?; + let conn = connection::pg_connection_read(self).await?; PaymentAttempt::find_by_payment_id_merchant_id(&conn, payment_id, merchant_id) .await .map_err(Into::into) @@ -560,7 +560,7 @@ mod storage { storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult { let database_call = || async { - let conn = pg_connection(&self.master_pool).await?; + let conn = connection::pg_connection_read(self).await?; PaymentAttempt::find_by_connector_transaction_id_payment_id_merchant_id( &conn, connector_transaction_id, @@ -618,7 +618,7 @@ mod storage { storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult { let database_call = || async { - let conn = pg_connection(&self.master_pool).await?; + let conn = connection::pg_connection_read(self).await?; PaymentAttempt::find_by_merchant_id_connector_txn_id( &conn, merchant_id, @@ -654,7 +654,7 @@ mod storage { storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult { let database_call = || async { - let conn = pg_connection(&self.master_pool).await?; + let conn = connection::pg_connection_read(self).await?; PaymentAttempt::find_by_merchant_id_attempt_id(&conn, merchant_id, attempt_id) .await .map_err(Into::into) diff --git a/crates/router/src/db/payment_intent.rs b/crates/router/src/db/payment_intent.rs index 4a6d2c2850..b177f5307b 100644 --- a/crates/router/src/db/payment_intent.rs +++ b/crates/router/src/db/payment_intent.rs @@ -47,7 +47,7 @@ mod storage { #[cfg(feature = "olap")] use crate::types::api; use crate::{ - connection::pg_connection, + connection, core::errors::{self, CustomResult}, services::Store, types::storage::{enums, kv, payment_intent::*}, @@ -63,7 +63,7 @@ mod storage { ) -> CustomResult { match storage_scheme { enums::MerchantStorageScheme::PostgresOnly => { - let conn = pg_connection(&self.master_pool).await?; + let conn = connection::pg_connection_write(self).await?; new.insert(&conn).await.map_err(Into::into).into_report() } @@ -135,7 +135,7 @@ mod storage { ) -> CustomResult { match storage_scheme { enums::MerchantStorageScheme::PostgresOnly => { - let conn = pg_connection(&self.master_pool).await?; + let conn = connection::pg_connection_write(self).await?; this.update(&conn, payment_intent) .await .map_err(Into::into) @@ -191,7 +191,7 @@ mod storage { storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult { let database_call = || async { - let conn = pg_connection(&self.master_pool).await?; + let conn = connection::pg_connection_read(self).await?; PaymentIntent::find_by_payment_id_merchant_id(&conn, payment_id, merchant_id) .await .map_err(Into::into) @@ -222,7 +222,7 @@ mod storage { ) -> CustomResult, errors::StorageError> { match storage_scheme { enums::MerchantStorageScheme::PostgresOnly => { - let conn = pg_connection(&self.replica_pool).await?; + let conn = connection::pg_connection_read(self).await?; PaymentIntent::filter_by_constraints(&conn, merchant_id, pc) .await .map_err(Into::into) @@ -243,7 +243,7 @@ mod storage { #[cfg(feature = "olap")] use crate::types::api; use crate::{ - connection::pg_connection, + connection, core::errors::{self, CustomResult}, services::Store, types::storage::{enums, payment_intent::*}, @@ -256,7 +256,7 @@ mod storage { new: PaymentIntentNew, _storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult { - let conn = pg_connection(&self.master_pool).await?; + let conn = connection::pg_connection_write(self).await?; new.insert(&conn).await.map_err(Into::into).into_report() } @@ -266,7 +266,7 @@ mod storage { payment_intent: PaymentIntentUpdate, _storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult { - let conn = pg_connection(&self.master_pool).await?; + let conn = connection::pg_connection_write(self).await?; this.update(&conn, payment_intent) .await .map_err(Into::into) @@ -279,7 +279,7 @@ mod storage { merchant_id: &str, _storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult { - let conn = pg_connection(&self.master_pool).await?; + let conn = connection::pg_connection_read(self).await?; PaymentIntent::find_by_payment_id_merchant_id(&conn, payment_id, merchant_id) .await .map_err(Into::into) @@ -293,7 +293,7 @@ mod storage { pc: &api::PaymentListConstraints, _storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult, errors::StorageError> { - let conn = pg_connection(&self.replica_pool).await?; + let conn = connection::pg_connection_read(self).await?; PaymentIntent::filter_by_constraints(&conn, merchant_id, pc) .await .map_err(Into::into) diff --git a/crates/router/src/db/payment_method.rs b/crates/router/src/db/payment_method.rs index a01821caaf..acd81eff14 100644 --- a/crates/router/src/db/payment_method.rs +++ b/crates/router/src/db/payment_method.rs @@ -2,7 +2,7 @@ use error_stack::IntoReport; use super::{MockDb, Store}; use crate::{ - connection::pg_connection, + connection, core::errors::{self, CustomResult}, types::storage, }; @@ -38,7 +38,7 @@ impl PaymentMethodInterface for Store { &self, payment_method_id: &str, ) -> CustomResult { - let conn = pg_connection(&self.master_pool).await?; + let conn = connection::pg_connection_read(self).await?; storage::PaymentMethod::find_by_payment_method_id(&conn, payment_method_id) .await .map_err(Into::into) @@ -49,7 +49,7 @@ impl PaymentMethodInterface for Store { &self, m: storage::PaymentMethodNew, ) -> CustomResult { - let conn = pg_connection(&self.master_pool).await?; + let conn = connection::pg_connection_write(self).await?; m.insert(&conn).await.map_err(Into::into).into_report() } @@ -58,7 +58,7 @@ impl PaymentMethodInterface for Store { customer_id: &str, merchant_id: &str, ) -> CustomResult, errors::StorageError> { - let conn = pg_connection(&self.master_pool).await?; + let conn = connection::pg_connection_read(self).await?; storage::PaymentMethod::find_by_customer_id_merchant_id(&conn, customer_id, merchant_id) .await .map_err(Into::into) @@ -70,7 +70,7 @@ impl PaymentMethodInterface for Store { merchant_id: &str, payment_method_id: &str, ) -> CustomResult { - let conn = pg_connection(&self.master_pool).await?; + let conn = connection::pg_connection_write(self).await?; storage::PaymentMethod::delete_by_merchant_id_payment_method_id( &conn, merchant_id, diff --git a/crates/router/src/db/process_tracker.rs b/crates/router/src/db/process_tracker.rs index 0e44073aa7..f142586185 100644 --- a/crates/router/src/db/process_tracker.rs +++ b/crates/router/src/db/process_tracker.rs @@ -3,7 +3,7 @@ use time::PrimitiveDateTime; use super::{MockDb, Store}; use crate::{ - connection::pg_connection, + connection, core::errors::{self, CustomResult}, types::storage::{self, enums}, }; @@ -58,7 +58,7 @@ impl ProcessTrackerInterface for Store { &self, id: &str, ) -> CustomResult, errors::StorageError> { - let conn = pg_connection(&self.master_pool).await?; + let conn = connection::pg_connection_read(self).await?; storage::ProcessTracker::find_process_by_id(&conn, id) .await .map_err(Into::into) @@ -70,7 +70,7 @@ impl ProcessTrackerInterface for Store { ids: Vec, schedule_time: PrimitiveDateTime, ) -> CustomResult { - let conn = pg_connection(&self.master_pool).await?; + let conn = connection::pg_connection_write(self).await?; storage::ProcessTracker::reinitialize_limbo_processes(&conn, ids, schedule_time) .await .map_err(Into::into) @@ -84,7 +84,7 @@ impl ProcessTrackerInterface for Store { status: enums::ProcessTrackerStatus, limit: Option, ) -> CustomResult, errors::StorageError> { - let conn = pg_connection(&self.master_pool).await?; + let conn = connection::pg_connection_read(self).await?; storage::ProcessTracker::find_processes_by_time_status( &conn, time_lower_limit, @@ -101,7 +101,7 @@ impl ProcessTrackerInterface for Store { &self, new: storage::ProcessTrackerNew, ) -> CustomResult { - let conn = pg_connection(&self.master_pool).await?; + let conn = connection::pg_connection_write(self).await?; new.insert_process(&conn) .await .map_err(Into::into) @@ -113,7 +113,7 @@ impl ProcessTrackerInterface for Store { this: storage::ProcessTracker, process: storage::ProcessTrackerUpdate, ) -> CustomResult { - let conn = pg_connection(&self.master_pool).await?; + let conn = connection::pg_connection_write(self).await?; this.update(&conn, process) .await .map_err(Into::into) @@ -125,7 +125,7 @@ impl ProcessTrackerInterface for Store { this: storage::ProcessTracker, process: storage::ProcessTrackerUpdate, ) -> CustomResult { - let conn = pg_connection(&self.master_pool).await?; + let conn = connection::pg_connection_write(self).await?; this.update(&conn, process) .await .map_err(Into::into) @@ -137,7 +137,7 @@ impl ProcessTrackerInterface for Store { task_ids: Vec, task_update: storage::ProcessTrackerUpdate, ) -> CustomResult { - let conn = pg_connection(&self.master_pool).await?; + let conn = connection::pg_connection_write(self).await?; storage::ProcessTracker::update_process_status_by_ids(&conn, task_ids, task_update) .await .map_err(Into::into) diff --git a/crates/router/src/db/refund.rs b/crates/router/src/db/refund.rs index 2c2dc78c81..e3ff8b8dc6 100644 --- a/crates/router/src/db/refund.rs +++ b/crates/router/src/db/refund.rs @@ -22,13 +22,6 @@ pub trait RefundInterface { storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult, errors::StorageError>; - // async fn find_refund_by_payment_id_merchant_id_refund_id( - // &self, - // payment_id: &str, - // merchant_id: &str, - // refund_id: &str, - // ) -> CustomResult; - async fn find_refund_by_merchant_id_refund_id( &self, merchant_id: &str, @@ -80,7 +73,7 @@ mod storage { use super::RefundInterface; use crate::{ - connection::pg_connection, + connection, core::errors::{self, CustomResult}, services::Store, types::storage::{self as storage_types, enums}, @@ -94,7 +87,7 @@ mod storage { merchant_id: &str, _storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult { - let conn = pg_connection(&self.master_pool).await?; + let conn = connection::pg_connection_read(self).await?; storage_types::Refund::find_by_internal_reference_id_merchant_id( &conn, internal_reference_id, @@ -110,7 +103,7 @@ mod storage { new: storage_types::RefundNew, _storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult { - let conn = pg_connection(&self.master_pool).await?; + let conn = connection::pg_connection_write(self).await?; new.insert(&conn).await.map_err(Into::into).into_report() } @@ -120,7 +113,7 @@ mod storage { connector_transaction_id: &str, _storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult, errors::StorageError> { - let conn = pg_connection(&self.master_pool).await?; + let conn = connection::pg_connection_read(self).await?; storage_types::Refund::find_by_merchant_id_connector_transaction_id( &conn, merchant_id, @@ -137,7 +130,7 @@ mod storage { refund: storage_types::RefundUpdate, _storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult { - let conn = pg_connection(&self.master_pool).await?; + let conn = connection::pg_connection_write(self).await?; this.update(&conn, refund) .await .map_err(Into::into) @@ -150,7 +143,7 @@ mod storage { refund_id: &str, _storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult { - let conn = pg_connection(&self.master_pool).await?; + let conn = connection::pg_connection_read(self).await?; storage_types::Refund::find_by_merchant_id_refund_id(&conn, merchant_id, refund_id) .await .map_err(Into::into) @@ -164,7 +157,7 @@ mod storage { connector: &str, _storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult { - let conn = pg_connection(&self.master_pool).await?; + let conn = connection::pg_connection_read(self).await?; storage_types::Refund::find_by_merchant_id_connector_refund_id_connector( &conn, merchant_id, @@ -176,24 +169,13 @@ mod storage { .into_report() } - // async fn find_refund_by_payment_id_merchant_id_refund_id( - // &self, - // payment_id: &str, - // merchant_id: &str, - // refund_id: &str, - // ) -> CustomResult { - // let conn = pg_connection(&self.master_pool).await; - // Refund::find_by_payment_id_merchant_id_refund_id(&conn, payment_id, merchant_id, refund_id) - // .await - // } - async fn find_refund_by_payment_id_merchant_id( &self, payment_id: &str, merchant_id: &str, _storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult, errors::StorageError> { - let conn = pg_connection(&self.master_pool).await?; + let conn = connection::pg_connection_read(self).await?; storage_types::Refund::find_by_payment_id_merchant_id(&conn, payment_id, merchant_id) .await .map_err(Into::into) @@ -208,7 +190,7 @@ mod storage { _storage_scheme: enums::MerchantStorageScheme, limit: i64, ) -> CustomResult, errors::StorageError> { - let conn = pg_connection(&self.replica_pool).await?; + let conn = connection::pg_connection_read(self).await?; ::filter_by_constraints( &conn, merchant_id, @@ -230,7 +212,7 @@ mod storage { use super::RefundInterface; use crate::{ - connection::pg_connection, + connection, core::errors::{self, CustomResult}, db::reverse_lookup::ReverseLookupInterface, logger, @@ -247,7 +229,7 @@ mod storage { storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult { let database_call = || async { - let conn = pg_connection(&self.master_pool).await?; + let conn = connection::pg_connection_read(self).await?; storage_types::Refund::find_by_internal_reference_id_merchant_id( &conn, internal_reference_id, @@ -282,7 +264,7 @@ mod storage { ) -> CustomResult { match storage_scheme { enums::MerchantStorageScheme::PostgresOnly => { - let conn = pg_connection(&self.master_pool).await?; + let conn = connection::pg_connection_write(self).await?; new.insert(&conn).await.map_err(Into::into).into_report() } enums::MerchantStorageScheme::RedisKv => { @@ -332,7 +314,7 @@ mod storage { }) .into_report(), Ok(HsetnxReply::KeySet) => { - let conn = pg_connection(&self.master_pool).await?; + let conn = connection::pg_connection_write(self).await?; let mut reverse_lookups = vec![ storage_types::ReverseLookupNew { @@ -405,7 +387,7 @@ mod storage { ) -> CustomResult, errors::StorageError> { match storage_scheme { enums::MerchantStorageScheme::PostgresOnly => { - let conn = pg_connection(&self.master_pool).await?; + let conn = connection::pg_connection_read(self).await?; storage_types::Refund::find_by_merchant_id_connector_transaction_id( &conn, merchant_id, @@ -445,7 +427,7 @@ mod storage { ) -> CustomResult { match storage_scheme { enums::MerchantStorageScheme::PostgresOnly => { - let conn = pg_connection(&self.master_pool).await?; + let conn = connection::pg_connection_write(self).await?; this.update(&conn, refund) .await .map_err(Into::into) @@ -501,7 +483,7 @@ mod storage { storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult { let database_call = || async { - let conn = pg_connection(&self.master_pool).await?; + let conn = connection::pg_connection_read(self).await?; storage_types::Refund::find_by_merchant_id_refund_id(&conn, merchant_id, refund_id) .await .map_err(Into::into) @@ -533,7 +515,7 @@ mod storage { storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult { let database_call = || async { - let conn = pg_connection(&self.master_pool).await?; + let conn = connection::pg_connection_read(self).await?; storage_types::Refund::find_by_merchant_id_connector_refund_id_connector( &conn, merchant_id, @@ -562,17 +544,6 @@ mod storage { } } - // async fn find_refund_by_payment_id_merchant_id_refund_id( - // &self, - // payment_id: &str, - // merchant_id: &str, - // refund_id: &str, - // ) -> CustomResult { - // let conn = pg_connection(&self.master_pool).await; - // Refund::find_by_payment_id_merchant_id_refund_id(&conn, payment_id, merchant_id, refund_id) - // .await - // } - async fn find_refund_by_payment_id_merchant_id( &self, payment_id: &str, @@ -581,7 +552,7 @@ mod storage { ) -> CustomResult, errors::StorageError> { match storage_scheme { enums::MerchantStorageScheme::PostgresOnly => { - let conn = pg_connection(&self.master_pool).await?; + let conn = connection::pg_connection_read(self).await?; storage_types::Refund::find_by_payment_id_merchant_id( &conn, payment_id, @@ -616,7 +587,7 @@ mod storage { ) -> CustomResult, errors::StorageError> { match storage_scheme { enums::MerchantStorageScheme::PostgresOnly => { - let conn = pg_connection(&self.replica_pool).await?; + let conn = connection::pg_connection_read(self).await?; ::filter_by_constraints(&conn, merchant_id, refund_details, limit) .await .map_err(Into::into) diff --git a/crates/router/src/db/reverse_lookup.rs b/crates/router/src/db/reverse_lookup.rs index 3465fa01a0..fbccc77026 100644 --- a/crates/router/src/db/reverse_lookup.rs +++ b/crates/router/src/db/reverse_lookup.rs @@ -2,7 +2,7 @@ use error_stack::IntoReport; use super::{cache, MockDb, Store}; use crate::{ - connection::pg_connection, + connection, errors::{self, CustomResult}, types::storage::reverse_lookup::{ReverseLookup, ReverseLookupNew}, }; @@ -25,7 +25,7 @@ impl ReverseLookupInterface for Store { &self, new: ReverseLookupNew, ) -> CustomResult { - let conn = pg_connection(&self.master_pool).await?; + let conn = connection::pg_connection_write(self).await?; new.insert(&conn).await.map_err(Into::into).into_report() } @@ -34,7 +34,7 @@ impl ReverseLookupInterface for Store { id: &str, ) -> CustomResult { let database_call = || async { - let conn = pg_connection(&self.master_pool).await?; + let conn = connection::pg_connection_read(self).await?; ReverseLookup::find_by_lookup_id(id, &conn) .await .map_err(Into::into) diff --git a/crates/router/tests/payments.rs b/crates/router/tests/payments.rs index 55d990856b..f4f83f4772 100644 --- a/crates/router/tests/payments.rs +++ b/crates/router/tests/payments.rs @@ -361,7 +361,7 @@ async fn payments_create_core() { // let state = routes::AppState { // flow_name: String::from("default"), -// pg_conn: connection::pg_connection(&conf), +// pg_conn: connection::pg_connection_read(&conf), // redis_conn: connection::redis_connection(&conf).await, // };