refactor: get connection pool based on olap/oltp features (#743)

This commit is contained in:
Kartikeya Hegde
2023-03-20 11:35:10 +05:30
committed by GitHub
parent 853dfa1635
commit a392fb165d
18 changed files with 147 additions and 146 deletions

View File

@ -56,12 +56,42 @@ pub async fn diesel_make_pg_pool(database: &Database, test_transaction: bool) ->
.expect("Failed to create PostgreSQL connection pool")
}
pub async fn pg_connection(
pool: &PgPool,
pub async fn pg_connection_read(
store: &crate::services::Store,
) -> errors::CustomResult<
PooledConnection<'_, async_bb8_diesel::ConnectionManager<PgConnection>>,
errors::StorageError,
> {
// If only OLAP is enabled get replica pool.
#[cfg(all(feature = "olap", not(feature = "oltp")))]
let pool = &store.replica_pool;
// If either one of these are true we need to get master pool.
// 1. Only OLTP is enabled.
// 2. Both OLAP and OLTP is enabled.
// 3. Both OLAP and OLTP is disabled.
#[cfg(any(
all(not(feature = "olap"), feature = "oltp"),
all(feature = "olap", feature = "oltp"),
all(not(feature = "olap"), not(feature = "oltp"))
))]
let pool = &store.master_pool;
pool.get()
.await
.into_report()
.change_context(errors::StorageError::DatabaseConnectionError)
}
pub async fn pg_connection_write(
store: &crate::services::Store,
) -> errors::CustomResult<
PooledConnection<'_, async_bb8_diesel::ConnectionManager<PgConnection>>,
errors::StorageError,
> {
// Since all writes should happen to master DB only choose master DB.
let pool = &store.master_pool;
pool.get()
.await
.into_report()

View File

@ -2,7 +2,7 @@ use error_stack::IntoReport;
use super::{MockDb, Store};
use crate::{
connection::pg_connection,
connection,
core::errors::{self, CustomResult},
types::storage,
};
@ -39,7 +39,7 @@ impl AddressInterface for Store {
&self,
address_id: &str,
) -> CustomResult<storage::Address, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await?;
let conn = connection::pg_connection_read(self).await?;
storage::Address::find_by_address_id(&conn, address_id)
.await
.map_err(Into::into)
@ -51,7 +51,7 @@ impl AddressInterface for Store {
address_id: String,
address: storage::AddressUpdate,
) -> CustomResult<storage::Address, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await?;
let conn = connection::pg_connection_write(self).await?;
storage::Address::update_by_address_id(&conn, address_id, address)
.await
.map_err(Into::into)
@ -62,7 +62,7 @@ impl AddressInterface for Store {
&self,
address: storage::AddressNew,
) -> CustomResult<storage::Address, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await?;
let conn = connection::pg_connection_write(self).await?;
address
.insert(&conn)
.await
@ -76,7 +76,7 @@ impl AddressInterface for Store {
merchant_id: &str,
address: storage::AddressUpdate,
) -> CustomResult<Vec<storage::Address>, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await?;
let conn = connection::pg_connection_write(self).await?;
storage::Address::update_by_merchant_id_customer_id(
&conn,
customer_id,

View File

@ -2,7 +2,7 @@ use error_stack::IntoReport;
use super::{MockDb, Store};
use crate::{
connection::pg_connection,
connection,
core::errors::{self, CustomResult},
types::storage,
};
@ -46,7 +46,7 @@ impl ApiKeyInterface for Store {
&self,
api_key: storage::ApiKeyNew,
) -> CustomResult<storage::ApiKey, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await?;
let conn = connection::pg_connection_write(self).await?;
api_key
.insert(&conn)
.await
@ -59,7 +59,7 @@ impl ApiKeyInterface for Store {
key_id: String,
api_key: storage::ApiKeyUpdate,
) -> CustomResult<storage::ApiKey, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await?;
let conn = connection::pg_connection_write(self).await?;
storage::ApiKey::update_by_key_id(&conn, key_id, api_key)
.await
.map_err(Into::into)
@ -67,7 +67,7 @@ impl ApiKeyInterface for Store {
}
async fn revoke_api_key(&self, key_id: &str) -> CustomResult<bool, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await?;
let conn = connection::pg_connection_write(self).await?;
storage::ApiKey::revoke_by_key_id(&conn, key_id)
.await
.map_err(Into::into)
@ -78,7 +78,7 @@ impl ApiKeyInterface for Store {
&self,
key_id: &str,
) -> CustomResult<Option<storage::ApiKey>, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await?;
let conn = connection::pg_connection_read(self).await?;
storage::ApiKey::find_optional_by_key_id(&conn, key_id)
.await
.map_err(Into::into)
@ -89,7 +89,7 @@ impl ApiKeyInterface for Store {
&self,
hashed_api_key: storage::HashedApiKey,
) -> CustomResult<Option<storage::ApiKey>, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await?;
let conn = connection::pg_connection_read(self).await?;
storage::ApiKey::find_optional_by_hashed_api_key(&conn, hashed_api_key)
.await
.map_err(Into::into)
@ -102,7 +102,7 @@ impl ApiKeyInterface for Store {
limit: Option<i64>,
offset: Option<i64>,
) -> CustomResult<Vec<storage::ApiKey>, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await?;
let conn = connection::pg_connection_read(self).await?;
storage::ApiKey::find_by_merchant_id(&conn, merchant_id, limit, offset)
.await
.map_err(Into::into)

View File

@ -2,7 +2,7 @@ use error_stack::IntoReport;
use super::{cache, MockDb, Store};
use crate::{
connection::pg_connection,
connection,
core::errors::{self, CustomResult},
types::storage,
};
@ -45,7 +45,7 @@ impl ConfigInterface for Store {
&self,
config: storage::ConfigNew,
) -> CustomResult<storage::Config, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await?;
let conn = connection::pg_connection_write(self).await?;
config.insert(&conn).await.map_err(Into::into).into_report()
}
@ -53,7 +53,7 @@ impl ConfigInterface for Store {
&self,
key: &str,
) -> CustomResult<storage::Config, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await?;
let conn = connection::pg_connection_write(self).await?;
storage::Config::find_by_key(&conn, key)
.await
.map_err(Into::into)
@ -65,7 +65,7 @@ impl ConfigInterface for Store {
key: &str,
config_update: storage::ConfigUpdate,
) -> CustomResult<storage::Config, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await?;
let conn = connection::pg_connection_write(self).await?;
storage::Config::update_by_key(&conn, key, config_update)
.await
.map_err(Into::into)
@ -91,7 +91,7 @@ impl ConfigInterface for Store {
}
async fn delete_config_by_key(&self, key: &str) -> CustomResult<bool, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await?;
let conn = connection::pg_connection_write(self).await?;
storage::Config::delete_by_key(&conn, key)
.await
.map_err(Into::into)

View File

@ -2,7 +2,7 @@ use error_stack::IntoReport;
use super::{MockDb, Store};
use crate::{
connection::pg_connection,
connection,
core::errors::{self, CustomResult},
types::storage::{self, enums},
};
@ -38,7 +38,7 @@ impl ConnectorResponseInterface for Store {
connector_response: storage::ConnectorResponseNew,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<storage::ConnectorResponse, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await?;
let conn = connection::pg_connection_write(self).await?;
connector_response
.insert(&conn)
.await
@ -53,7 +53,7 @@ impl ConnectorResponseInterface for Store {
attempt_id: &str,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<storage::ConnectorResponse, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await?;
let conn = connection::pg_connection_read(self).await?;
storage::ConnectorResponse::find_by_payment_id_merchant_id_attempt_id(
&conn,
payment_id,
@ -71,7 +71,7 @@ impl ConnectorResponseInterface for Store {
connector_response_update: storage::ConnectorResponseUpdate,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<storage::ConnectorResponse, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await?;
let conn = connection::pg_connection_write(self).await?;
this.update(&conn, connector_response_update)
.await
.map_err(Into::into)

View File

@ -2,7 +2,7 @@ use error_stack::IntoReport;
use super::{MockDb, Store};
use crate::{
connection::pg_connection,
connection,
core::{
customers::REDACTED,
errors::{self, CustomResult},
@ -50,7 +50,7 @@ impl CustomerInterface for Store {
customer_id: &str,
merchant_id: &str,
) -> CustomResult<Option<storage::Customer>, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await?;
let conn = connection::pg_connection_read(self).await?;
let maybe_customer = storage::Customer::find_optional_by_customer_id_merchant_id(
&conn,
customer_id,
@ -75,7 +75,7 @@ impl CustomerInterface for Store {
merchant_id: String,
customer: storage::CustomerUpdate,
) -> CustomResult<storage::Customer, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await?;
let conn = connection::pg_connection_write(self).await?;
storage::Customer::update_by_customer_id_merchant_id(
&conn,
customer_id,
@ -92,7 +92,7 @@ impl CustomerInterface for Store {
customer_id: &str,
merchant_id: &str,
) -> CustomResult<storage::Customer, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await?;
let conn = connection::pg_connection_read(self).await?;
let customer =
storage::Customer::find_by_customer_id_merchant_id(&conn, customer_id, merchant_id)
.await
@ -108,7 +108,7 @@ impl CustomerInterface for Store {
&self,
customer_data: storage::CustomerNew,
) -> CustomResult<storage::Customer, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await?;
let conn = connection::pg_connection_write(self).await?;
customer_data
.insert(&conn)
.await
@ -121,7 +121,7 @@ impl CustomerInterface for Store {
customer_id: &str,
merchant_id: &str,
) -> CustomResult<bool, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await?;
let conn = connection::pg_connection_write(self).await?;
storage::Customer::delete_by_customer_id_merchant_id(&conn, customer_id, merchant_id)
.await
.map_err(Into::into)

View File

@ -2,7 +2,7 @@ use error_stack::IntoReport;
use super::{MockDb, Store};
use crate::{
connection::pg_connection,
connection,
core::errors::{self, CustomResult},
types::storage,
};
@ -21,7 +21,7 @@ impl EventInterface for Store {
&self,
event: storage::EventNew,
) -> CustomResult<storage::Event, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await?;
let conn = connection::pg_connection_write(self).await?;
event.insert(&conn).await.map_err(Into::into).into_report()
}
}

View File

@ -2,7 +2,7 @@ use error_stack::IntoReport;
use super::{MockDb, Store};
use crate::{
connection::pg_connection,
connection,
core::errors::{self, CustomResult},
types::storage,
};
@ -31,7 +31,7 @@ impl LockerMockUpInterface for Store {
&self,
card_id: &str,
) -> CustomResult<storage::LockerMockUp, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await?;
let conn = connection::pg_connection_read(self).await?;
storage::LockerMockUp::find_by_card_id(&conn, card_id)
.await
.map_err(Into::into)
@ -42,7 +42,7 @@ impl LockerMockUpInterface for Store {
&self,
new: storage::LockerMockUpNew,
) -> CustomResult<storage::LockerMockUp, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await?;
let conn = connection::pg_connection_write(self).await?;
new.insert(&conn).await.map_err(Into::into).into_report()
}
@ -50,7 +50,7 @@ impl LockerMockUpInterface for Store {
&self,
card_id: &str,
) -> CustomResult<storage::LockerMockUp, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await?;
let conn = connection::pg_connection_write(self).await?;
storage::LockerMockUp::delete_by_card_id(&conn, card_id)
.await
.map_err(Into::into)

View File

@ -2,7 +2,7 @@ use error_stack::IntoReport;
use super::{MockDb, Store};
use crate::{
connection::pg_connection,
connection,
core::errors::{self, CustomResult},
types::storage,
};
@ -41,7 +41,7 @@ impl MandateInterface for Store {
merchant_id: &str,
mandate_id: &str,
) -> CustomResult<storage::Mandate, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await?;
let conn = connection::pg_connection_read(self).await?;
storage::Mandate::find_by_merchant_id_mandate_id(&conn, merchant_id, mandate_id)
.await
.map_err(Into::into)
@ -53,7 +53,7 @@ impl MandateInterface for Store {
merchant_id: &str,
customer_id: &str,
) -> CustomResult<Vec<storage::Mandate>, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await?;
let conn = connection::pg_connection_read(self).await?;
storage::Mandate::find_by_merchant_id_customer_id(&conn, merchant_id, customer_id)
.await
.map_err(Into::into)
@ -66,7 +66,7 @@ impl MandateInterface for Store {
mandate_id: &str,
mandate: storage::MandateUpdate,
) -> CustomResult<storage::Mandate, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await?;
let conn = connection::pg_connection_write(self).await?;
storage::Mandate::update_by_merchant_id_mandate_id(&conn, merchant_id, mandate_id, mandate)
.await
.map_err(Into::into)
@ -77,7 +77,7 @@ impl MandateInterface for Store {
&self,
mandate: storage::MandateNew,
) -> CustomResult<storage::Mandate, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await?;
let conn = connection::pg_connection_write(self).await?;
mandate
.insert(&conn)
.await

View File

@ -2,7 +2,7 @@ use error_stack::IntoReport;
use super::{MockDb, Store};
use crate::{
connection::pg_connection,
connection,
core::errors::{self, CustomResult},
types::storage::{self, enums},
};
@ -48,7 +48,7 @@ impl MerchantAccountInterface for Store {
&self,
merchant_account: storage::MerchantAccountNew,
) -> CustomResult<storage::MerchantAccount, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await?;
let conn = connection::pg_connection_write(self).await?;
merchant_account
.insert(&conn)
.await
@ -61,7 +61,7 @@ impl MerchantAccountInterface for Store {
merchant_id: &str,
) -> CustomResult<storage::MerchantAccount, errors::StorageError> {
let fetch_func = || async {
let conn = pg_connection(&self.master_pool).await?;
let conn = connection::pg_connection_read(self).await?;
storage::MerchantAccount::find_by_merchant_id(&conn, merchant_id)
.await
.map_err(Into::into)
@ -86,7 +86,7 @@ impl MerchantAccountInterface for Store {
) -> CustomResult<storage::MerchantAccount, errors::StorageError> {
let _merchant_id = this.merchant_id.clone();
let update_func = || async {
let conn = pg_connection(&self.master_pool).await?;
let conn = connection::pg_connection_write(self).await?;
this.update(&conn, merchant_account)
.await
.map_err(Into::into)
@ -110,7 +110,7 @@ impl MerchantAccountInterface for Store {
merchant_account: storage::MerchantAccountUpdate,
) -> CustomResult<storage::MerchantAccount, errors::StorageError> {
let update_func = || async {
let conn = pg_connection(&self.master_pool).await?;
let conn = connection::pg_connection_write(self).await?;
storage::MerchantAccount::update_with_specific_fields(
&conn,
merchant_id,
@ -136,7 +136,7 @@ impl MerchantAccountInterface for Store {
&self,
publishable_key: &str,
) -> CustomResult<storage::MerchantAccount, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await?;
let conn = connection::pg_connection_read(self).await?;
storage::MerchantAccount::find_by_publishable_key(&conn, publishable_key)
.await
.map_err(Into::into)
@ -148,7 +148,7 @@ impl MerchantAccountInterface for Store {
merchant_id: &str,
) -> CustomResult<bool, errors::StorageError> {
let delete_func = || async {
let conn = pg_connection(&self.master_pool).await?;
let conn = connection::pg_connection_write(self).await?;
storage::MerchantAccount::delete_by_merchant_id(&conn, merchant_id)
.await
.map_err(Into::into)

View File

@ -4,7 +4,7 @@ use masking::ExposeInterface;
use super::{MockDb, Store};
use crate::{
connection::pg_connection,
connection,
core::errors::{self, CustomResult},
services::logger,
types::{self, storage},
@ -141,7 +141,7 @@ impl MerchantConnectorAccountInterface for Store {
merchant_id: &str,
connector: &str,
) -> CustomResult<storage::MerchantConnectorAccount, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await?;
let conn = connection::pg_connection_read(self).await?;
storage::MerchantConnectorAccount::find_by_merchant_id_connector(
&conn,
merchant_id,
@ -158,7 +158,7 @@ impl MerchantConnectorAccountInterface for Store {
merchant_connector_id: &str,
) -> CustomResult<storage::MerchantConnectorAccount, errors::StorageError> {
let find_call = || async {
let conn = pg_connection(&self.master_pool).await?;
let conn = connection::pg_connection_read(self).await?;
storage::MerchantConnectorAccount::find_by_merchant_id_merchant_connector_id(
&conn,
merchant_id,
@ -183,7 +183,7 @@ impl MerchantConnectorAccountInterface for Store {
&self,
t: storage::MerchantConnectorAccountNew,
) -> CustomResult<storage::MerchantConnectorAccount, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await?;
let conn = connection::pg_connection_write(self).await?;
t.insert(&conn).await.map_err(Into::into).into_report()
}
@ -192,7 +192,7 @@ impl MerchantConnectorAccountInterface for Store {
merchant_id: &str,
get_disabled: bool,
) -> CustomResult<Vec<storage::MerchantConnectorAccount>, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await?;
let conn = connection::pg_connection_read(self).await?;
storage::MerchantConnectorAccount::find_by_merchant_id(&conn, merchant_id, get_disabled)
.await
.map_err(Into::into)
@ -206,7 +206,7 @@ impl MerchantConnectorAccountInterface for Store {
) -> CustomResult<storage::MerchantConnectorAccount, errors::StorageError> {
let _merchant_connector_id = this.merchant_connector_id.clone();
let update_call = || async {
let conn = pg_connection(&self.master_pool).await?;
let conn = connection::pg_connection_write(self).await?;
this.update(&conn, merchant_connector_account)
.await
.map_err(Into::into)
@ -229,7 +229,7 @@ impl MerchantConnectorAccountInterface for Store {
merchant_id: &str,
merchant_connector_id: &str,
) -> CustomResult<bool, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await?;
let conn = connection::pg_connection_write(self).await?;
storage::MerchantConnectorAccount::delete_by_merchant_id_merchant_connector_id(
&conn,
merchant_id,

View File

@ -62,7 +62,7 @@ mod storage {
use super::PaymentAttemptInterface;
use crate::{
connection::pg_connection,
connection,
core::errors::{self, CustomResult},
services::Store,
types::storage::{enums, payment_attempt::*},
@ -75,7 +75,7 @@ mod storage {
payment_attempt: PaymentAttemptNew,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<PaymentAttempt, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await?;
let conn = connection::pg_connection_write(self).await?;
payment_attempt
.insert(&conn)
.await
@ -89,7 +89,7 @@ mod storage {
payment_attempt: PaymentAttemptUpdate,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<PaymentAttempt, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await?;
let conn = connection::pg_connection_write(self).await?;
this.update(&conn, payment_attempt)
.await
.map_err(Into::into)
@ -102,7 +102,7 @@ mod storage {
merchant_id: &str,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<PaymentAttempt, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await?;
let conn = connection::pg_connection_read(self).await?;
PaymentAttempt::find_by_payment_id_merchant_id(&conn, payment_id, merchant_id)
.await
.map_err(Into::into)
@ -116,7 +116,7 @@ mod storage {
merchant_id: &str,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<PaymentAttempt, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await?;
let conn = connection::pg_connection_read(self).await?;
PaymentAttempt::find_by_connector_transaction_id_payment_id_merchant_id(
&conn,
connector_transaction_id,
@ -134,7 +134,7 @@ mod storage {
merchant_id: &str,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<PaymentAttempt, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await?;
let conn = connection::pg_connection_read(self).await?;
PaymentAttempt::find_last_successful_attempt_by_payment_id_merchant_id(
&conn,
payment_id,
@ -151,7 +151,7 @@ mod storage {
connector_txn_id: &str,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<PaymentAttempt, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await?;
let conn = connection::pg_connection_read(self).await?;
PaymentAttempt::find_by_merchant_id_connector_txn_id(
&conn,
merchant_id,
@ -168,7 +168,7 @@ mod storage {
attempt_id: &str,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<PaymentAttempt, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await?;
let conn = connection::pg_connection_read(self).await?;
PaymentAttempt::find_by_merchant_id_attempt_id(&conn, merchant_id, attempt_id)
.await
@ -320,7 +320,7 @@ mod storage {
use super::PaymentAttemptInterface;
use crate::{
connection::pg_connection,
connection,
core::errors::{self, CustomResult},
db::reverse_lookup::ReverseLookupInterface,
services::Store,
@ -337,7 +337,7 @@ mod storage {
) -> CustomResult<PaymentAttempt, errors::StorageError> {
match storage_scheme {
enums::MerchantStorageScheme::PostgresOnly => {
let conn = pg_connection(&self.master_pool).await?;
let conn = connection::pg_connection_write(self).await?;
payment_attempt
.insert(&conn)
.await
@ -400,7 +400,7 @@ mod storage {
})
.into_report(),
Ok(HsetnxReply::KeySet) => {
let conn = pg_connection(&self.master_pool).await?;
let conn = connection::pg_connection_write(self).await?;
//Reverse lookup for attempt_id
ReverseLookupNew {
@ -448,7 +448,7 @@ mod storage {
) -> CustomResult<PaymentAttempt, errors::StorageError> {
match storage_scheme {
enums::MerchantStorageScheme::PostgresOnly => {
let conn = pg_connection(&self.master_pool).await?;
let conn = connection::pg_connection_write(self).await?;
this.update(&conn, payment_attempt)
.await
.map_err(Into::into)
@ -472,7 +472,7 @@ mod storage {
.map(|_| updated_attempt)
.change_context(errors::StorageError::KVError)?;
let conn = pg_connection(&self.master_pool).await?;
let conn = connection::pg_connection_write(self).await?;
// Reverse lookup for connector_transaction_id
if let (None, Some(connector_transaction_id)) = (
old_connector_transaction_id,
@ -524,7 +524,7 @@ mod storage {
storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<PaymentAttempt, errors::StorageError> {
let database_call = || async {
let conn = pg_connection(&self.master_pool).await?;
let conn = connection::pg_connection_read(self).await?;
PaymentAttempt::find_by_payment_id_merchant_id(&conn, payment_id, merchant_id)
.await
.map_err(Into::into)
@ -560,7 +560,7 @@ mod storage {
storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<PaymentAttempt, errors::StorageError> {
let database_call = || async {
let conn = pg_connection(&self.master_pool).await?;
let conn = connection::pg_connection_read(self).await?;
PaymentAttempt::find_by_connector_transaction_id_payment_id_merchant_id(
&conn,
connector_transaction_id,
@ -618,7 +618,7 @@ mod storage {
storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<PaymentAttempt, errors::StorageError> {
let database_call = || async {
let conn = pg_connection(&self.master_pool).await?;
let conn = connection::pg_connection_read(self).await?;
PaymentAttempt::find_by_merchant_id_connector_txn_id(
&conn,
merchant_id,
@ -654,7 +654,7 @@ mod storage {
storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<PaymentAttempt, errors::StorageError> {
let database_call = || async {
let conn = pg_connection(&self.master_pool).await?;
let conn = connection::pg_connection_read(self).await?;
PaymentAttempt::find_by_merchant_id_attempt_id(&conn, merchant_id, attempt_id)
.await
.map_err(Into::into)

View File

@ -47,7 +47,7 @@ mod storage {
#[cfg(feature = "olap")]
use crate::types::api;
use crate::{
connection::pg_connection,
connection,
core::errors::{self, CustomResult},
services::Store,
types::storage::{enums, kv, payment_intent::*},
@ -63,7 +63,7 @@ mod storage {
) -> CustomResult<PaymentIntent, errors::StorageError> {
match storage_scheme {
enums::MerchantStorageScheme::PostgresOnly => {
let conn = pg_connection(&self.master_pool).await?;
let conn = connection::pg_connection_write(self).await?;
new.insert(&conn).await.map_err(Into::into).into_report()
}
@ -135,7 +135,7 @@ mod storage {
) -> CustomResult<PaymentIntent, errors::StorageError> {
match storage_scheme {
enums::MerchantStorageScheme::PostgresOnly => {
let conn = pg_connection(&self.master_pool).await?;
let conn = connection::pg_connection_write(self).await?;
this.update(&conn, payment_intent)
.await
.map_err(Into::into)
@ -191,7 +191,7 @@ mod storage {
storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<PaymentIntent, errors::StorageError> {
let database_call = || async {
let conn = pg_connection(&self.master_pool).await?;
let conn = connection::pg_connection_read(self).await?;
PaymentIntent::find_by_payment_id_merchant_id(&conn, payment_id, merchant_id)
.await
.map_err(Into::into)
@ -222,7 +222,7 @@ mod storage {
) -> CustomResult<Vec<PaymentIntent>, errors::StorageError> {
match storage_scheme {
enums::MerchantStorageScheme::PostgresOnly => {
let conn = pg_connection(&self.replica_pool).await?;
let conn = connection::pg_connection_read(self).await?;
PaymentIntent::filter_by_constraints(&conn, merchant_id, pc)
.await
.map_err(Into::into)
@ -243,7 +243,7 @@ mod storage {
#[cfg(feature = "olap")]
use crate::types::api;
use crate::{
connection::pg_connection,
connection,
core::errors::{self, CustomResult},
services::Store,
types::storage::{enums, payment_intent::*},
@ -256,7 +256,7 @@ mod storage {
new: PaymentIntentNew,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<PaymentIntent, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await?;
let conn = connection::pg_connection_write(self).await?;
new.insert(&conn).await.map_err(Into::into).into_report()
}
@ -266,7 +266,7 @@ mod storage {
payment_intent: PaymentIntentUpdate,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<PaymentIntent, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await?;
let conn = connection::pg_connection_write(self).await?;
this.update(&conn, payment_intent)
.await
.map_err(Into::into)
@ -279,7 +279,7 @@ mod storage {
merchant_id: &str,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<PaymentIntent, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await?;
let conn = connection::pg_connection_read(self).await?;
PaymentIntent::find_by_payment_id_merchant_id(&conn, payment_id, merchant_id)
.await
.map_err(Into::into)
@ -293,7 +293,7 @@ mod storage {
pc: &api::PaymentListConstraints,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<Vec<PaymentIntent>, errors::StorageError> {
let conn = pg_connection(&self.replica_pool).await?;
let conn = connection::pg_connection_read(self).await?;
PaymentIntent::filter_by_constraints(&conn, merchant_id, pc)
.await
.map_err(Into::into)

View File

@ -2,7 +2,7 @@ use error_stack::IntoReport;
use super::{MockDb, Store};
use crate::{
connection::pg_connection,
connection,
core::errors::{self, CustomResult},
types::storage,
};
@ -38,7 +38,7 @@ impl PaymentMethodInterface for Store {
&self,
payment_method_id: &str,
) -> CustomResult<storage::PaymentMethod, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await?;
let conn = connection::pg_connection_read(self).await?;
storage::PaymentMethod::find_by_payment_method_id(&conn, payment_method_id)
.await
.map_err(Into::into)
@ -49,7 +49,7 @@ impl PaymentMethodInterface for Store {
&self,
m: storage::PaymentMethodNew,
) -> CustomResult<storage::PaymentMethod, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await?;
let conn = connection::pg_connection_write(self).await?;
m.insert(&conn).await.map_err(Into::into).into_report()
}
@ -58,7 +58,7 @@ impl PaymentMethodInterface for Store {
customer_id: &str,
merchant_id: &str,
) -> CustomResult<Vec<storage::PaymentMethod>, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await?;
let conn = connection::pg_connection_read(self).await?;
storage::PaymentMethod::find_by_customer_id_merchant_id(&conn, customer_id, merchant_id)
.await
.map_err(Into::into)
@ -70,7 +70,7 @@ impl PaymentMethodInterface for Store {
merchant_id: &str,
payment_method_id: &str,
) -> CustomResult<storage::PaymentMethod, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await?;
let conn = connection::pg_connection_write(self).await?;
storage::PaymentMethod::delete_by_merchant_id_payment_method_id(
&conn,
merchant_id,

View File

@ -3,7 +3,7 @@ use time::PrimitiveDateTime;
use super::{MockDb, Store};
use crate::{
connection::pg_connection,
connection,
core::errors::{self, CustomResult},
types::storage::{self, enums},
};
@ -58,7 +58,7 @@ impl ProcessTrackerInterface for Store {
&self,
id: &str,
) -> CustomResult<Option<storage::ProcessTracker>, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await?;
let conn = connection::pg_connection_read(self).await?;
storage::ProcessTracker::find_process_by_id(&conn, id)
.await
.map_err(Into::into)
@ -70,7 +70,7 @@ impl ProcessTrackerInterface for Store {
ids: Vec<String>,
schedule_time: PrimitiveDateTime,
) -> CustomResult<usize, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await?;
let conn = connection::pg_connection_write(self).await?;
storage::ProcessTracker::reinitialize_limbo_processes(&conn, ids, schedule_time)
.await
.map_err(Into::into)
@ -84,7 +84,7 @@ impl ProcessTrackerInterface for Store {
status: enums::ProcessTrackerStatus,
limit: Option<i64>,
) -> CustomResult<Vec<storage::ProcessTracker>, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await?;
let conn = connection::pg_connection_read(self).await?;
storage::ProcessTracker::find_processes_by_time_status(
&conn,
time_lower_limit,
@ -101,7 +101,7 @@ impl ProcessTrackerInterface for Store {
&self,
new: storage::ProcessTrackerNew,
) -> CustomResult<storage::ProcessTracker, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await?;
let conn = connection::pg_connection_write(self).await?;
new.insert_process(&conn)
.await
.map_err(Into::into)
@ -113,7 +113,7 @@ impl ProcessTrackerInterface for Store {
this: storage::ProcessTracker,
process: storage::ProcessTrackerUpdate,
) -> CustomResult<storage::ProcessTracker, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await?;
let conn = connection::pg_connection_write(self).await?;
this.update(&conn, process)
.await
.map_err(Into::into)
@ -125,7 +125,7 @@ impl ProcessTrackerInterface for Store {
this: storage::ProcessTracker,
process: storage::ProcessTrackerUpdate,
) -> CustomResult<storage::ProcessTracker, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await?;
let conn = connection::pg_connection_write(self).await?;
this.update(&conn, process)
.await
.map_err(Into::into)
@ -137,7 +137,7 @@ impl ProcessTrackerInterface for Store {
task_ids: Vec<String>,
task_update: storage::ProcessTrackerUpdate,
) -> CustomResult<usize, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await?;
let conn = connection::pg_connection_write(self).await?;
storage::ProcessTracker::update_process_status_by_ids(&conn, task_ids, task_update)
.await
.map_err(Into::into)

View File

@ -22,13 +22,6 @@ pub trait RefundInterface {
storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<Vec<storage_types::Refund>, errors::StorageError>;
// async fn find_refund_by_payment_id_merchant_id_refund_id(
// &self,
// payment_id: &str,
// merchant_id: &str,
// refund_id: &str,
// ) -> CustomResult<Refund, errors::StorageError>;
async fn find_refund_by_merchant_id_refund_id(
&self,
merchant_id: &str,
@ -80,7 +73,7 @@ mod storage {
use super::RefundInterface;
use crate::{
connection::pg_connection,
connection,
core::errors::{self, CustomResult},
services::Store,
types::storage::{self as storage_types, enums},
@ -94,7 +87,7 @@ mod storage {
merchant_id: &str,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<storage_types::Refund, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await?;
let conn = connection::pg_connection_read(self).await?;
storage_types::Refund::find_by_internal_reference_id_merchant_id(
&conn,
internal_reference_id,
@ -110,7 +103,7 @@ mod storage {
new: storage_types::RefundNew,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<storage_types::Refund, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await?;
let conn = connection::pg_connection_write(self).await?;
new.insert(&conn).await.map_err(Into::into).into_report()
}
@ -120,7 +113,7 @@ mod storage {
connector_transaction_id: &str,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<Vec<storage_types::Refund>, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await?;
let conn = connection::pg_connection_read(self).await?;
storage_types::Refund::find_by_merchant_id_connector_transaction_id(
&conn,
merchant_id,
@ -137,7 +130,7 @@ mod storage {
refund: storage_types::RefundUpdate,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<storage_types::Refund, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await?;
let conn = connection::pg_connection_write(self).await?;
this.update(&conn, refund)
.await
.map_err(Into::into)
@ -150,7 +143,7 @@ mod storage {
refund_id: &str,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<storage_types::Refund, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await?;
let conn = connection::pg_connection_read(self).await?;
storage_types::Refund::find_by_merchant_id_refund_id(&conn, merchant_id, refund_id)
.await
.map_err(Into::into)
@ -164,7 +157,7 @@ mod storage {
connector: &str,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<storage_types::Refund, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await?;
let conn = connection::pg_connection_read(self).await?;
storage_types::Refund::find_by_merchant_id_connector_refund_id_connector(
&conn,
merchant_id,
@ -176,24 +169,13 @@ mod storage {
.into_report()
}
// async fn find_refund_by_payment_id_merchant_id_refund_id(
// &self,
// payment_id: &str,
// merchant_id: &str,
// refund_id: &str,
// ) -> CustomResult<Refund, errors::StorageError> {
// let conn = pg_connection(&self.master_pool).await;
// Refund::find_by_payment_id_merchant_id_refund_id(&conn, payment_id, merchant_id, refund_id)
// .await
// }
async fn find_refund_by_payment_id_merchant_id(
&self,
payment_id: &str,
merchant_id: &str,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<Vec<storage_types::Refund>, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await?;
let conn = connection::pg_connection_read(self).await?;
storage_types::Refund::find_by_payment_id_merchant_id(&conn, payment_id, merchant_id)
.await
.map_err(Into::into)
@ -208,7 +190,7 @@ mod storage {
_storage_scheme: enums::MerchantStorageScheme,
limit: i64,
) -> CustomResult<Vec<storage_models::refund::Refund>, errors::StorageError> {
let conn = pg_connection(&self.replica_pool).await?;
let conn = connection::pg_connection_read(self).await?;
<storage_models::refund::Refund as storage_types::RefundDbExt>::filter_by_constraints(
&conn,
merchant_id,
@ -230,7 +212,7 @@ mod storage {
use super::RefundInterface;
use crate::{
connection::pg_connection,
connection,
core::errors::{self, CustomResult},
db::reverse_lookup::ReverseLookupInterface,
logger,
@ -247,7 +229,7 @@ mod storage {
storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<storage_types::Refund, errors::StorageError> {
let database_call = || async {
let conn = pg_connection(&self.master_pool).await?;
let conn = connection::pg_connection_read(self).await?;
storage_types::Refund::find_by_internal_reference_id_merchant_id(
&conn,
internal_reference_id,
@ -282,7 +264,7 @@ mod storage {
) -> CustomResult<storage_types::Refund, errors::StorageError> {
match storage_scheme {
enums::MerchantStorageScheme::PostgresOnly => {
let conn = pg_connection(&self.master_pool).await?;
let conn = connection::pg_connection_write(self).await?;
new.insert(&conn).await.map_err(Into::into).into_report()
}
enums::MerchantStorageScheme::RedisKv => {
@ -332,7 +314,7 @@ mod storage {
})
.into_report(),
Ok(HsetnxReply::KeySet) => {
let conn = pg_connection(&self.master_pool).await?;
let conn = connection::pg_connection_write(self).await?;
let mut reverse_lookups = vec![
storage_types::ReverseLookupNew {
@ -405,7 +387,7 @@ mod storage {
) -> CustomResult<Vec<storage_types::Refund>, errors::StorageError> {
match storage_scheme {
enums::MerchantStorageScheme::PostgresOnly => {
let conn = pg_connection(&self.master_pool).await?;
let conn = connection::pg_connection_read(self).await?;
storage_types::Refund::find_by_merchant_id_connector_transaction_id(
&conn,
merchant_id,
@ -445,7 +427,7 @@ mod storage {
) -> CustomResult<storage_types::Refund, errors::StorageError> {
match storage_scheme {
enums::MerchantStorageScheme::PostgresOnly => {
let conn = pg_connection(&self.master_pool).await?;
let conn = connection::pg_connection_write(self).await?;
this.update(&conn, refund)
.await
.map_err(Into::into)
@ -501,7 +483,7 @@ mod storage {
storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<storage_types::Refund, errors::StorageError> {
let database_call = || async {
let conn = pg_connection(&self.master_pool).await?;
let conn = connection::pg_connection_read(self).await?;
storage_types::Refund::find_by_merchant_id_refund_id(&conn, merchant_id, refund_id)
.await
.map_err(Into::into)
@ -533,7 +515,7 @@ mod storage {
storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<storage_types::Refund, errors::StorageError> {
let database_call = || async {
let conn = pg_connection(&self.master_pool).await?;
let conn = connection::pg_connection_read(self).await?;
storage_types::Refund::find_by_merchant_id_connector_refund_id_connector(
&conn,
merchant_id,
@ -562,17 +544,6 @@ mod storage {
}
}
// async fn find_refund_by_payment_id_merchant_id_refund_id(
// &self,
// payment_id: &str,
// merchant_id: &str,
// refund_id: &str,
// ) -> CustomResult<Refund, errors::StorageError> {
// let conn = pg_connection(&self.master_pool).await;
// Refund::find_by_payment_id_merchant_id_refund_id(&conn, payment_id, merchant_id, refund_id)
// .await
// }
async fn find_refund_by_payment_id_merchant_id(
&self,
payment_id: &str,
@ -581,7 +552,7 @@ mod storage {
) -> CustomResult<Vec<storage_types::Refund>, errors::StorageError> {
match storage_scheme {
enums::MerchantStorageScheme::PostgresOnly => {
let conn = pg_connection(&self.master_pool).await?;
let conn = connection::pg_connection_read(self).await?;
storage_types::Refund::find_by_payment_id_merchant_id(
&conn,
payment_id,
@ -616,7 +587,7 @@ mod storage {
) -> CustomResult<Vec<storage_models::refund::Refund>, errors::StorageError> {
match storage_scheme {
enums::MerchantStorageScheme::PostgresOnly => {
let conn = pg_connection(&self.replica_pool).await?;
let conn = connection::pg_connection_read(self).await?;
<storage_models::refund::Refund as storage_types::RefundDbExt>::filter_by_constraints(&conn, merchant_id, refund_details, limit)
.await
.map_err(Into::into)

View File

@ -2,7 +2,7 @@ use error_stack::IntoReport;
use super::{cache, MockDb, Store};
use crate::{
connection::pg_connection,
connection,
errors::{self, CustomResult},
types::storage::reverse_lookup::{ReverseLookup, ReverseLookupNew},
};
@ -25,7 +25,7 @@ impl ReverseLookupInterface for Store {
&self,
new: ReverseLookupNew,
) -> CustomResult<ReverseLookup, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await?;
let conn = connection::pg_connection_write(self).await?;
new.insert(&conn).await.map_err(Into::into).into_report()
}
@ -34,7 +34,7 @@ impl ReverseLookupInterface for Store {
id: &str,
) -> CustomResult<ReverseLookup, errors::StorageError> {
let database_call = || async {
let conn = pg_connection(&self.master_pool).await?;
let conn = connection::pg_connection_read(self).await?;
ReverseLookup::find_by_lookup_id(id, &conn)
.await
.map_err(Into::into)

View File

@ -361,7 +361,7 @@ async fn payments_create_core() {
// let state = routes::AppState {
// flow_name: String::from("default"),
// pg_conn: connection::pg_connection(&conf),
// pg_conn: connection::pg_connection_read(&conf),
// redis_conn: connection::redis_connection(&conf).await,
// };