refactor: Throw 500 error on database connection error instead of panic (#527)

This commit is contained in:
Kartikeya Hegde
2023-02-14 17:14:23 +05:30
committed by GitHub
parent aafb115acb
commit f1e3bf4895
23 changed files with 117 additions and 98 deletions

View File

@ -20,6 +20,7 @@ impl Default for super::settings::Database {
port: 5432,
dbname: String::new(),
pool_size: 5,
connection_timeout: 10,
}
}
}

View File

@ -123,6 +123,7 @@ pub struct Database {
pub port: u16,
pub dbname: String,
pub pool_size: u32,
pub connection_timeout: u64,
}
#[derive(Debug, Deserialize, Clone)]

View File

@ -1,8 +1,9 @@
use async_bb8_diesel::{AsyncConnection, ConnectionError};
use bb8::{CustomizeConnection, PooledConnection};
use diesel::PgConnection;
use error_stack::{IntoReport, ResultExt};
use crate::configs::settings::Database;
use crate::{configs::settings::Database, errors};
pub type PgPool = bb8::Pool<async_bb8_diesel::ConnectionManager<PgConnection>>;
@ -42,7 +43,9 @@ pub async fn diesel_make_pg_pool(database: &Database, test_transaction: bool) ->
database.username, database.password, database.host, database.port, database.dbname
);
let manager = async_bb8_diesel::ConnectionManager::<PgConnection>::new(database_url);
let mut pool = bb8::Pool::builder().max_size(database.pool_size);
let mut pool = bb8::Pool::builder()
.max_size(database.pool_size)
.connection_timeout(std::time::Duration::from_secs(database.connection_timeout));
if test_transaction {
pool = pool.connection_customizer(Box::new(TestTransaction));
@ -53,11 +56,14 @@ pub async fn diesel_make_pg_pool(database: &Database, test_transaction: bool) ->
.expect("Failed to create PostgreSQL connection pool")
}
#[allow(clippy::expect_used)]
pub async fn pg_connection(
pool: &PgPool,
) -> PooledConnection<'_, async_bb8_diesel::ConnectionManager<PgConnection>> {
) -> errors::CustomResult<
PooledConnection<'_, async_bb8_diesel::ConnectionManager<PgConnection>>,
errors::StorageError,
> {
pool.get()
.await
.expect("Couldn't retrieve PostgreSQL connection")
.into_report()
.change_context(errors::StorageError::DatabaseConnectionError)
}

View File

@ -59,6 +59,8 @@ pub enum StorageError {
entity: &'static str,
key: Option<String>,
},
#[error("Timed out while trying to connect to the database")]
DatabaseConnectionError,
#[error("KV error")]
KVError,
#[error("Serialization failure")]

View File

@ -39,7 +39,7 @@ impl AddressInterface for Store {
&self,
address_id: &str,
) -> CustomResult<storage::Address, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await;
let conn = pg_connection(&self.master_pool).await?;
storage::Address::find_by_address_id(&conn, address_id)
.await
.map_err(Into::into)
@ -51,7 +51,7 @@ impl AddressInterface for Store {
address_id: String,
address: storage::AddressUpdate,
) -> CustomResult<storage::Address, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await;
let conn = pg_connection(&self.master_pool).await?;
storage::Address::update_by_address_id(&conn, address_id, address)
.await
.map_err(Into::into)
@ -62,7 +62,7 @@ impl AddressInterface for Store {
&self,
address: storage::AddressNew,
) -> CustomResult<storage::Address, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await;
let conn = pg_connection(&self.master_pool).await?;
address
.insert(&conn)
.await
@ -76,7 +76,7 @@ impl AddressInterface for Store {
merchant_id: &str,
address: storage::AddressUpdate,
) -> CustomResult<Vec<storage::Address>, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await;
let conn = pg_connection(&self.master_pool).await?;
storage::Address::update_by_merchant_id_customer_id(
&conn,
customer_id,

View File

@ -41,7 +41,7 @@ impl ApiKeyInterface for Store {
&self,
api_key: storage::ApiKeyNew,
) -> CustomResult<storage::ApiKey, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await;
let conn = pg_connection(&self.master_pool).await?;
api_key
.insert(&conn)
.await
@ -54,7 +54,7 @@ impl ApiKeyInterface for Store {
key_id: String,
api_key: storage::ApiKeyUpdate,
) -> CustomResult<storage::ApiKey, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await;
let conn = pg_connection(&self.master_pool).await?;
storage::ApiKey::update_by_key_id(&conn, key_id, api_key)
.await
.map_err(Into::into)
@ -62,7 +62,7 @@ impl ApiKeyInterface for Store {
}
async fn revoke_api_key(&self, key_id: &str) -> CustomResult<bool, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await;
let conn = pg_connection(&self.master_pool).await?;
storage::ApiKey::revoke_by_key_id(&conn, key_id)
.await
.map_err(Into::into)
@ -73,7 +73,7 @@ impl ApiKeyInterface for Store {
&self,
key_id: &str,
) -> CustomResult<Option<storage::ApiKey>, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await;
let conn = pg_connection(&self.master_pool).await?;
storage::ApiKey::find_optional_by_key_id(&conn, key_id)
.await
.map_err(Into::into)
@ -86,7 +86,7 @@ impl ApiKeyInterface for Store {
limit: Option<i64>,
offset: Option<i64>,
) -> CustomResult<Vec<storage::ApiKey>, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await;
let conn = pg_connection(&self.master_pool).await?;
storage::ApiKey::find_by_merchant_id(&conn, merchant_id, limit, offset)
.await
.map_err(Into::into)

View File

@ -45,7 +45,7 @@ impl ConfigInterface for Store {
&self,
config: storage::ConfigNew,
) -> CustomResult<storage::Config, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await;
let conn = pg_connection(&self.master_pool).await?;
config.insert(&conn).await.map_err(Into::into).into_report()
}
@ -53,7 +53,7 @@ impl ConfigInterface for Store {
&self,
key: &str,
) -> CustomResult<storage::Config, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await;
let conn = pg_connection(&self.master_pool).await?;
storage::Config::find_by_key(&conn, key)
.await
.map_err(Into::into)
@ -65,7 +65,7 @@ impl ConfigInterface for Store {
key: &str,
config_update: storage::ConfigUpdate,
) -> CustomResult<storage::Config, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await;
let conn = pg_connection(&self.master_pool).await?;
storage::Config::update_by_key(&conn, key, config_update)
.await
.map_err(Into::into)
@ -91,7 +91,7 @@ impl ConfigInterface for Store {
}
async fn delete_config_by_key(&self, key: &str) -> CustomResult<bool, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await;
let conn = pg_connection(&self.master_pool).await?;
storage::Config::delete_by_key(&conn, key)
.await
.map_err(Into::into)

View File

@ -38,7 +38,7 @@ impl ConnectorResponseInterface for Store {
connector_response: storage::ConnectorResponseNew,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<storage::ConnectorResponse, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await;
let conn = pg_connection(&self.master_pool).await?;
connector_response
.insert(&conn)
.await
@ -53,7 +53,7 @@ impl ConnectorResponseInterface for Store {
attempt_id: &str,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<storage::ConnectorResponse, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await;
let conn = pg_connection(&self.master_pool).await?;
storage::ConnectorResponse::find_by_payment_id_merchant_id_attempt_id(
&conn,
payment_id,
@ -71,7 +71,7 @@ impl ConnectorResponseInterface for Store {
connector_response_update: storage::ConnectorResponseUpdate,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<storage::ConnectorResponse, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await;
let conn = pg_connection(&self.master_pool).await?;
this.update(&conn, connector_response_update)
.await
.map_err(Into::into)

View File

@ -50,7 +50,7 @@ impl CustomerInterface for Store {
customer_id: &str,
merchant_id: &str,
) -> CustomResult<Option<storage::Customer>, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await;
let conn = pg_connection(&self.master_pool).await?;
let maybe_customer = storage::Customer::find_optional_by_customer_id_merchant_id(
&conn,
customer_id,
@ -75,7 +75,7 @@ impl CustomerInterface for Store {
merchant_id: String,
customer: storage::CustomerUpdate,
) -> CustomResult<storage::Customer, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await;
let conn = pg_connection(&self.master_pool).await?;
storage::Customer::update_by_customer_id_merchant_id(
&conn,
customer_id,
@ -92,7 +92,7 @@ impl CustomerInterface for Store {
customer_id: &str,
merchant_id: &str,
) -> CustomResult<storage::Customer, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await;
let conn = pg_connection(&self.master_pool).await?;
let customer =
storage::Customer::find_by_customer_id_merchant_id(&conn, customer_id, merchant_id)
.await
@ -108,7 +108,7 @@ impl CustomerInterface for Store {
&self,
customer_data: storage::CustomerNew,
) -> CustomResult<storage::Customer, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await;
let conn = pg_connection(&self.master_pool).await?;
customer_data
.insert(&conn)
.await
@ -121,7 +121,7 @@ impl CustomerInterface for Store {
customer_id: &str,
merchant_id: &str,
) -> CustomResult<bool, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await;
let conn = pg_connection(&self.master_pool).await?;
storage::Customer::delete_by_customer_id_merchant_id(&conn, customer_id, merchant_id)
.await
.map_err(Into::into)

View File

@ -21,7 +21,7 @@ impl EventInterface for Store {
&self,
event: storage::EventNew,
) -> CustomResult<storage::Event, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await;
let conn = pg_connection(&self.master_pool).await?;
event.insert(&conn).await.map_err(Into::into).into_report()
}
}

View File

@ -31,7 +31,7 @@ impl LockerMockUpInterface for Store {
&self,
card_id: &str,
) -> CustomResult<storage::LockerMockUp, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await;
let conn = pg_connection(&self.master_pool).await?;
storage::LockerMockUp::find_by_card_id(&conn, card_id)
.await
.map_err(Into::into)
@ -42,7 +42,7 @@ impl LockerMockUpInterface for Store {
&self,
new: storage::LockerMockUpNew,
) -> CustomResult<storage::LockerMockUp, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await;
let conn = pg_connection(&self.master_pool).await?;
new.insert(&conn).await.map_err(Into::into).into_report()
}
@ -50,7 +50,7 @@ impl LockerMockUpInterface for Store {
&self,
card_id: &str,
) -> CustomResult<storage::LockerMockUp, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await;
let conn = pg_connection(&self.master_pool).await?;
storage::LockerMockUp::delete_by_card_id(&conn, card_id)
.await
.map_err(Into::into)

View File

@ -41,7 +41,7 @@ impl MandateInterface for Store {
merchant_id: &str,
mandate_id: &str,
) -> CustomResult<storage::Mandate, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await;
let conn = pg_connection(&self.master_pool).await?;
storage::Mandate::find_by_merchant_id_mandate_id(&conn, merchant_id, mandate_id)
.await
.map_err(Into::into)
@ -53,7 +53,7 @@ impl MandateInterface for Store {
merchant_id: &str,
customer_id: &str,
) -> CustomResult<Vec<storage::Mandate>, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await;
let conn = pg_connection(&self.master_pool).await?;
storage::Mandate::find_by_merchant_id_customer_id(&conn, merchant_id, customer_id)
.await
.map_err(Into::into)
@ -66,7 +66,7 @@ impl MandateInterface for Store {
mandate_id: &str,
mandate: storage::MandateUpdate,
) -> CustomResult<storage::Mandate, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await;
let conn = pg_connection(&self.master_pool).await?;
storage::Mandate::update_by_merchant_id_mandate_id(&conn, merchant_id, mandate_id, mandate)
.await
.map_err(Into::into)
@ -77,7 +77,7 @@ impl MandateInterface for Store {
&self,
mandate: storage::MandateNew,
) -> CustomResult<storage::Mandate, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await;
let conn = pg_connection(&self.master_pool).await?;
mandate
.insert(&conn)
.await

View File

@ -54,7 +54,7 @@ impl MerchantAccountInterface for Store {
&self,
merchant_account: storage::MerchantAccountNew,
) -> CustomResult<storage::MerchantAccount, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await;
let conn = pg_connection(&self.master_pool).await?;
merchant_account
.insert(&conn)
.await
@ -67,7 +67,7 @@ impl MerchantAccountInterface for Store {
merchant_id: &str,
) -> CustomResult<storage::MerchantAccount, errors::StorageError> {
let fetch_func = || async {
let conn = pg_connection(&self.master_pool).await;
let conn = pg_connection(&self.master_pool).await?;
storage::MerchantAccount::find_by_merchant_id(&conn, merchant_id)
.await
.map_err(Into::into)
@ -92,7 +92,7 @@ impl MerchantAccountInterface for Store {
) -> CustomResult<storage::MerchantAccount, errors::StorageError> {
let _merchant_id = this.merchant_id.clone();
let update_func = || async {
let conn = pg_connection(&self.master_pool).await;
let conn = pg_connection(&self.master_pool).await?;
this.update(&conn, merchant_account)
.await
.map_err(Into::into)
@ -116,7 +116,7 @@ impl MerchantAccountInterface for Store {
merchant_account: storage::MerchantAccountUpdate,
) -> CustomResult<storage::MerchantAccount, errors::StorageError> {
let update_func = || async {
let conn = pg_connection(&self.master_pool).await;
let conn = pg_connection(&self.master_pool).await?;
storage::MerchantAccount::update_with_specific_fields(
&conn,
merchant_id,
@ -142,7 +142,7 @@ impl MerchantAccountInterface for Store {
&self,
api_key: &str,
) -> CustomResult<storage::MerchantAccount, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await;
let conn = pg_connection(&self.master_pool).await?;
storage::MerchantAccount::find_by_api_key(&conn, api_key)
.await
.map_err(Into::into)
@ -153,7 +153,7 @@ impl MerchantAccountInterface for Store {
&self,
publishable_key: &str,
) -> CustomResult<storage::MerchantAccount, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await;
let conn = pg_connection(&self.master_pool).await?;
storage::MerchantAccount::find_by_publishable_key(&conn, publishable_key)
.await
.map_err(Into::into)
@ -165,7 +165,7 @@ impl MerchantAccountInterface for Store {
merchant_id: &str,
) -> CustomResult<bool, errors::StorageError> {
let delete_func = || async {
let conn = pg_connection(&self.master_pool).await;
let conn = pg_connection(&self.master_pool).await?;
storage::MerchantAccount::delete_by_merchant_id(&conn, merchant_id)
.await
.map_err(Into::into)

View File

@ -140,7 +140,7 @@ impl MerchantConnectorAccountInterface for Store {
merchant_id: &str,
connector: &str,
) -> CustomResult<storage::MerchantConnectorAccount, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await;
let conn = pg_connection(&self.master_pool).await?;
storage::MerchantConnectorAccount::find_by_merchant_id_connector(
&conn,
merchant_id,
@ -157,7 +157,7 @@ impl MerchantConnectorAccountInterface for Store {
merchant_connector_id: &str,
) -> CustomResult<storage::MerchantConnectorAccount, errors::StorageError> {
let find_call = || async {
let conn = pg_connection(&self.master_pool).await;
let conn = pg_connection(&self.master_pool).await?;
storage::MerchantConnectorAccount::find_by_merchant_id_merchant_connector_id(
&conn,
merchant_id,
@ -182,7 +182,7 @@ impl MerchantConnectorAccountInterface for Store {
&self,
t: storage::MerchantConnectorAccountNew,
) -> CustomResult<storage::MerchantConnectorAccount, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await;
let conn = pg_connection(&self.master_pool).await?;
t.insert(&conn).await.map_err(Into::into).into_report()
}
@ -190,7 +190,7 @@ impl MerchantConnectorAccountInterface for Store {
&self,
merchant_id: &str,
) -> CustomResult<Vec<storage::MerchantConnectorAccount>, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await;
let conn = pg_connection(&self.master_pool).await?;
storage::MerchantConnectorAccount::find_by_merchant_id(&conn, merchant_id)
.await
.map_err(Into::into)
@ -204,7 +204,7 @@ impl MerchantConnectorAccountInterface for Store {
) -> CustomResult<storage::MerchantConnectorAccount, errors::StorageError> {
let _merchant_connector_id = this.merchant_connector_id.clone();
let update_call = || async {
let conn = pg_connection(&self.master_pool).await;
let conn = pg_connection(&self.master_pool).await?;
this.update(&conn, merchant_connector_account)
.await
.map_err(Into::into)
@ -227,7 +227,7 @@ impl MerchantConnectorAccountInterface for Store {
merchant_id: &str,
merchant_connector_id: &str,
) -> CustomResult<bool, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await;
let conn = pg_connection(&self.master_pool).await?;
storage::MerchantConnectorAccount::delete_by_merchant_id_merchant_connector_id(
&conn,
merchant_id,

View File

@ -75,7 +75,7 @@ mod storage {
payment_attempt: PaymentAttemptNew,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<PaymentAttempt, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await;
let conn = pg_connection(&self.master_pool).await?;
payment_attempt
.insert(&conn)
.await
@ -89,7 +89,7 @@ mod storage {
payment_attempt: PaymentAttemptUpdate,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<PaymentAttempt, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await;
let conn = pg_connection(&self.master_pool).await?;
this.update(&conn, payment_attempt)
.await
.map_err(Into::into)
@ -102,7 +102,7 @@ mod storage {
merchant_id: &str,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<PaymentAttempt, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await;
let conn = pg_connection(&self.master_pool).await?;
PaymentAttempt::find_by_payment_id_merchant_id(&conn, payment_id, merchant_id)
.await
.map_err(Into::into)
@ -116,7 +116,7 @@ mod storage {
merchant_id: &str,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<PaymentAttempt, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await;
let conn = pg_connection(&self.master_pool).await?;
PaymentAttempt::find_by_connector_transaction_id_payment_id_merchant_id(
&conn,
connector_transaction_id,
@ -134,7 +134,7 @@ mod storage {
merchant_id: &str,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<PaymentAttempt, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await;
let conn = pg_connection(&self.master_pool).await?;
PaymentAttempt::find_last_successful_attempt_by_payment_id_merchant_id(
&conn,
payment_id,
@ -151,7 +151,7 @@ mod storage {
connector_txn_id: &str,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<PaymentAttempt, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await;
let conn = pg_connection(&self.master_pool).await?;
PaymentAttempt::find_by_merchant_id_connector_txn_id(
&conn,
merchant_id,
@ -168,7 +168,7 @@ mod storage {
attempt_id: &str,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<PaymentAttempt, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await;
let conn = pg_connection(&self.master_pool).await?;
PaymentAttempt::find_by_merchant_id_attempt_id(&conn, merchant_id, attempt_id)
.await
@ -336,7 +336,7 @@ mod storage {
) -> CustomResult<PaymentAttempt, errors::StorageError> {
match storage_scheme {
enums::MerchantStorageScheme::PostgresOnly => {
let conn = pg_connection(&self.master_pool).await;
let conn = pg_connection(&self.master_pool).await?;
payment_attempt
.insert(&conn)
.await
@ -398,7 +398,7 @@ mod storage {
})
.into_report(),
Ok(HsetnxReply::KeySet) => {
let conn = pg_connection(&self.master_pool).await;
let conn = pg_connection(&self.master_pool).await?;
//Reverse lookup for attempt_id
ReverseLookupNew {
@ -446,7 +446,7 @@ mod storage {
) -> CustomResult<PaymentAttempt, errors::StorageError> {
match storage_scheme {
enums::MerchantStorageScheme::PostgresOnly => {
let conn = pg_connection(&self.master_pool).await;
let conn = pg_connection(&self.master_pool).await?;
this.update(&conn, payment_attempt)
.await
.map_err(Into::into)
@ -470,7 +470,7 @@ mod storage {
.map(|_| updated_attempt)
.change_context(errors::StorageError::KVError)?;
let conn = pg_connection(&self.master_pool).await;
let conn = pg_connection(&self.master_pool).await?;
// Reverse lookup for connector_transaction_id
if let (None, Some(connector_transaction_id)) = (
old_connector_transaction_id,
@ -522,7 +522,7 @@ mod storage {
storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<PaymentAttempt, errors::StorageError> {
let database_call = || async {
let conn = pg_connection(&self.master_pool).await;
let conn = pg_connection(&self.master_pool).await?;
PaymentAttempt::find_by_payment_id_merchant_id(&conn, payment_id, merchant_id)
.await
.map_err(Into::into)
@ -562,7 +562,7 @@ mod storage {
storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<PaymentAttempt, errors::StorageError> {
let database_call = || async {
let conn = pg_connection(&self.master_pool).await;
let conn = pg_connection(&self.master_pool).await?;
PaymentAttempt::find_by_connector_transaction_id_payment_id_merchant_id(
&conn,
connector_transaction_id,
@ -624,7 +624,7 @@ mod storage {
storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<PaymentAttempt, errors::StorageError> {
let database_call = || async {
let conn = pg_connection(&self.master_pool).await;
let conn = pg_connection(&self.master_pool).await?;
PaymentAttempt::find_by_merchant_id_connector_txn_id(
&conn,
merchant_id,
@ -664,7 +664,7 @@ mod storage {
storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<PaymentAttempt, errors::StorageError> {
let database_call = || async {
let conn = pg_connection(&self.master_pool).await;
let conn = pg_connection(&self.master_pool).await?;
PaymentAttempt::find_by_merchant_id_attempt_id(&conn, merchant_id, attempt_id)
.await
.map_err(Into::into)

View File

@ -63,7 +63,7 @@ mod storage {
) -> CustomResult<PaymentIntent, errors::StorageError> {
match storage_scheme {
enums::MerchantStorageScheme::PostgresOnly => {
let conn = pg_connection(&self.master_pool).await;
let conn = pg_connection(&self.master_pool).await?;
new.insert(&conn).await.map_err(Into::into).into_report()
}
@ -135,7 +135,7 @@ mod storage {
) -> CustomResult<PaymentIntent, errors::StorageError> {
match storage_scheme {
enums::MerchantStorageScheme::PostgresOnly => {
let conn = pg_connection(&self.master_pool).await;
let conn = pg_connection(&self.master_pool).await?;
this.update(&conn, payment_intent)
.await
.map_err(Into::into)
@ -191,7 +191,7 @@ mod storage {
storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<PaymentIntent, errors::StorageError> {
let database_call = || async {
let conn = pg_connection(&self.master_pool).await;
let conn = pg_connection(&self.master_pool).await?;
PaymentIntent::find_by_payment_id_merchant_id(&conn, payment_id, merchant_id)
.await
.map_err(Into::into)
@ -222,7 +222,7 @@ mod storage {
) -> CustomResult<Vec<PaymentIntent>, errors::StorageError> {
match storage_scheme {
enums::MerchantStorageScheme::PostgresOnly => {
let conn = pg_connection(&self.replica_pool).await;
let conn = pg_connection(&self.replica_pool).await?;
PaymentIntent::filter_by_constraints(&conn, merchant_id, pc)
.await
.map_err(Into::into)
@ -256,7 +256,7 @@ mod storage {
new: PaymentIntentNew,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<PaymentIntent, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await;
let conn = pg_connection(&self.master_pool).await?;
new.insert(&conn).await.map_err(Into::into).into_report()
}
@ -266,7 +266,7 @@ mod storage {
payment_intent: PaymentIntentUpdate,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<PaymentIntent, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await;
let conn = pg_connection(&self.master_pool).await?;
this.update(&conn, payment_intent)
.await
.map_err(Into::into)
@ -279,7 +279,7 @@ mod storage {
merchant_id: &str,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<PaymentIntent, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await;
let conn = pg_connection(&self.master_pool).await?;
PaymentIntent::find_by_payment_id_merchant_id(&conn, payment_id, merchant_id)
.await
.map_err(Into::into)
@ -293,7 +293,7 @@ mod storage {
pc: &api::PaymentListConstraints,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<Vec<PaymentIntent>, errors::StorageError> {
let conn = pg_connection(&self.replica_pool).await;
let conn = pg_connection(&self.replica_pool).await?;
PaymentIntent::filter_by_constraints(&conn, merchant_id, pc)
.await
.map_err(Into::into)

View File

@ -38,7 +38,7 @@ impl PaymentMethodInterface for Store {
&self,
payment_method_id: &str,
) -> CustomResult<storage::PaymentMethod, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await;
let conn = pg_connection(&self.master_pool).await?;
storage::PaymentMethod::find_by_payment_method_id(&conn, payment_method_id)
.await
.map_err(Into::into)
@ -49,7 +49,7 @@ impl PaymentMethodInterface for Store {
&self,
m: storage::PaymentMethodNew,
) -> CustomResult<storage::PaymentMethod, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await;
let conn = pg_connection(&self.master_pool).await?;
m.insert(&conn).await.map_err(Into::into).into_report()
}
@ -58,7 +58,7 @@ impl PaymentMethodInterface for Store {
customer_id: &str,
merchant_id: &str,
) -> CustomResult<Vec<storage::PaymentMethod>, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await;
let conn = pg_connection(&self.master_pool).await?;
storage::PaymentMethod::find_by_customer_id_merchant_id(&conn, customer_id, merchant_id)
.await
.map_err(Into::into)
@ -70,7 +70,7 @@ impl PaymentMethodInterface for Store {
merchant_id: &str,
payment_method_id: &str,
) -> CustomResult<storage::PaymentMethod, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await;
let conn = pg_connection(&self.master_pool).await?;
storage::PaymentMethod::delete_by_merchant_id_payment_method_id(
&conn,
merchant_id,

View File

@ -58,7 +58,7 @@ impl ProcessTrackerInterface for Store {
&self,
id: &str,
) -> CustomResult<Option<storage::ProcessTracker>, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await;
let conn = pg_connection(&self.master_pool).await?;
storage::ProcessTracker::find_process_by_id(&conn, id)
.await
.map_err(Into::into)
@ -70,7 +70,7 @@ impl ProcessTrackerInterface for Store {
ids: Vec<String>,
schedule_time: PrimitiveDateTime,
) -> CustomResult<usize, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await;
let conn = pg_connection(&self.master_pool).await?;
storage::ProcessTracker::reinitialize_limbo_processes(&conn, ids, schedule_time)
.await
.map_err(Into::into)
@ -84,7 +84,7 @@ impl ProcessTrackerInterface for Store {
status: enums::ProcessTrackerStatus,
limit: Option<i64>,
) -> CustomResult<Vec<storage::ProcessTracker>, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await;
let conn = pg_connection(&self.master_pool).await?;
storage::ProcessTracker::find_processes_by_time_status(
&conn,
time_lower_limit,
@ -101,7 +101,7 @@ impl ProcessTrackerInterface for Store {
&self,
new: storage::ProcessTrackerNew,
) -> CustomResult<storage::ProcessTracker, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await;
let conn = pg_connection(&self.master_pool).await?;
new.insert_process(&conn)
.await
.map_err(Into::into)
@ -113,7 +113,7 @@ impl ProcessTrackerInterface for Store {
this: storage::ProcessTracker,
process: storage::ProcessTrackerUpdate,
) -> CustomResult<storage::ProcessTracker, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await;
let conn = pg_connection(&self.master_pool).await?;
this.update(&conn, process)
.await
.map_err(Into::into)
@ -125,7 +125,7 @@ impl ProcessTrackerInterface for Store {
this: storage::ProcessTracker,
process: storage::ProcessTrackerUpdate,
) -> CustomResult<storage::ProcessTracker, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await;
let conn = pg_connection(&self.master_pool).await?;
this.update(&conn, process)
.await
.map_err(Into::into)
@ -137,7 +137,7 @@ impl ProcessTrackerInterface for Store {
task_ids: Vec<String>,
task_update: storage::ProcessTrackerUpdate,
) -> CustomResult<usize, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await;
let conn = pg_connection(&self.master_pool).await?;
storage::ProcessTracker::update_process_status_by_ids(&conn, task_ids, task_update)
.await
.map_err(Into::into)

View File

@ -86,7 +86,7 @@ mod storage {
merchant_id: &str,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<storage_types::Refund, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await;
let conn = pg_connection(&self.master_pool).await?;
storage_types::Refund::find_by_internal_reference_id_merchant_id(
&conn,
internal_reference_id,
@ -102,7 +102,7 @@ mod storage {
new: storage_types::RefundNew,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<storage_types::Refund, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await;
let conn = pg_connection(&self.master_pool).await?;
new.insert(&conn).await.map_err(Into::into).into_report()
}
@ -112,7 +112,7 @@ mod storage {
connector_transaction_id: &str,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<Vec<storage_types::Refund>, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await;
let conn = pg_connection(&self.master_pool).await?;
storage_types::Refund::find_by_merchant_id_connector_transaction_id(
&conn,
merchant_id,
@ -129,7 +129,7 @@ mod storage {
refund: storage_types::RefundUpdate,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<storage_types::Refund, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await;
let conn = pg_connection(&self.master_pool).await?;
this.update(&conn, refund)
.await
.map_err(Into::into)
@ -142,7 +142,7 @@ mod storage {
refund_id: &str,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<storage_types::Refund, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await;
let conn = pg_connection(&self.master_pool).await?;
storage_types::Refund::find_by_merchant_id_refund_id(&conn, merchant_id, refund_id)
.await
.map_err(Into::into)
@ -166,7 +166,7 @@ mod storage {
merchant_id: &str,
_storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<Vec<storage_types::Refund>, errors::StorageError> {
let conn = pg_connection(&self.master_pool).await;
let conn = pg_connection(&self.master_pool).await?;
storage_types::Refund::find_by_payment_id_merchant_id(&conn, payment_id, merchant_id)
.await
.map_err(Into::into)
@ -181,7 +181,7 @@ mod storage {
_storage_scheme: enums::MerchantStorageScheme,
limit: i64,
) -> CustomResult<Vec<storage_models::refund::Refund>, errors::StorageError> {
let conn = pg_connection(&self.replica_pool).await;
let conn = pg_connection(&self.replica_pool).await?;
<storage_models::refund::Refund as storage_types::RefundDbExt>::filter_by_constraints(
&conn,
merchant_id,
@ -220,7 +220,7 @@ mod storage {
storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<storage_types::Refund, errors::StorageError> {
let database_call = || async {
let conn = pg_connection(&self.master_pool).await;
let conn = pg_connection(&self.master_pool).await?;
storage_types::Refund::find_by_internal_reference_id_merchant_id(
&conn,
internal_reference_id,
@ -259,7 +259,7 @@ mod storage {
) -> CustomResult<storage_types::Refund, errors::StorageError> {
match storage_scheme {
enums::MerchantStorageScheme::PostgresOnly => {
let conn = pg_connection(&self.master_pool).await;
let conn = pg_connection(&self.master_pool).await?;
new.insert(&conn).await.map_err(Into::into).into_report()
}
enums::MerchantStorageScheme::RedisKv => {
@ -309,7 +309,7 @@ mod storage {
})
.into_report(),
Ok(HsetnxReply::KeySet) => {
let conn = pg_connection(&self.master_pool).await;
let conn = pg_connection(&self.master_pool).await?;
let reverse_lookups = vec![
storage_types::ReverseLookupNew {
@ -367,7 +367,7 @@ mod storage {
) -> CustomResult<Vec<storage_types::Refund>, errors::StorageError> {
match storage_scheme {
enums::MerchantStorageScheme::PostgresOnly => {
let conn = pg_connection(&self.master_pool).await;
let conn = pg_connection(&self.master_pool).await?;
storage_types::Refund::find_by_merchant_id_connector_transaction_id(
&conn,
merchant_id,
@ -407,7 +407,7 @@ mod storage {
) -> CustomResult<storage_types::Refund, errors::StorageError> {
match storage_scheme {
enums::MerchantStorageScheme::PostgresOnly => {
let conn = pg_connection(&self.master_pool).await;
let conn = pg_connection(&self.master_pool).await?;
this.update(&conn, refund)
.await
.map_err(Into::into)
@ -467,7 +467,7 @@ mod storage {
storage_scheme: enums::MerchantStorageScheme,
) -> CustomResult<storage_types::Refund, errors::StorageError> {
let database_call = || async {
let conn = pg_connection(&self.master_pool).await;
let conn = pg_connection(&self.master_pool).await?;
storage_types::Refund::find_by_merchant_id_refund_id(&conn, merchant_id, refund_id)
.await
.map_err(Into::into)
@ -514,7 +514,7 @@ mod storage {
) -> CustomResult<Vec<storage_types::Refund>, errors::StorageError> {
match storage_scheme {
enums::MerchantStorageScheme::PostgresOnly => {
let conn = pg_connection(&self.master_pool).await;
let conn = pg_connection(&self.master_pool).await?;
storage_types::Refund::find_by_payment_id_merchant_id(
&conn,
payment_id,
@ -553,7 +553,7 @@ mod storage {
) -> CustomResult<Vec<storage_models::refund::Refund>, errors::StorageError> {
match storage_scheme {
enums::MerchantStorageScheme::PostgresOnly => {
let conn = pg_connection(&self.replica_pool).await;
let conn = pg_connection(&self.replica_pool).await?;
<storage_models::refund::Refund as storage_types::RefundDbExt>::filter_by_constraints(&conn, merchant_id, refund_details, limit)
.await
.map_err(Into::into)

View File

@ -1,3 +1,4 @@
use error_stack::ResultExt;
use storage_models::{errors, StorageResult};
use super::{MockDb, Store};
@ -15,12 +16,16 @@ pub trait ReverseLookupInterface {
#[async_trait::async_trait]
impl ReverseLookupInterface for Store {
async fn insert_reverse_lookup(&self, new: ReverseLookupNew) -> StorageResult<ReverseLookup> {
let conn = pg_connection(&self.master_pool).await;
let conn = pg_connection(&self.master_pool)
.await
.change_context(errors::DatabaseError::DatabaseConnectionError)?;
new.insert(&conn).await
}
async fn get_lookup_by_lookup_id(&self, id: &str) -> StorageResult<ReverseLookup> {
let conn = pg_connection(&self.master_pool).await;
let conn = pg_connection(&self.master_pool)
.await
.change_context(errors::DatabaseError::DatabaseConnectionError)?;
ReverseLookup::find_by_lookup_id(id, &conn).await
}
}