refactor: Add OLAP feature for replica database (#4)

Co-authored-by: karthikey hegde <karthikey.hegde@karthikey.hegde-MacBookPro>
This commit is contained in:
Kartikeya Hegde
2022-11-22 18:17:12 +05:30
committed by GitHub
parent 4f74b65b71
commit d9768ccf4c
22 changed files with 94 additions and 78 deletions

View File

@ -10,7 +10,7 @@ log_format = "default"
enabled = false enabled = false
# TODO: Update database credentials before running application # TODO: Update database credentials before running application
[database] [master_database]
username = "db_user" username = "db_user"
password = "db_pass" password = "db_pass"
host = "localhost" host = "localhost"

View File

@ -16,7 +16,16 @@ request_body_limit = 16_384
# https_url = "https proxy url" # https_url = "https proxy url"
# Main SQL data store credentials # Main SQL data store credentials
[database] [master_database]
username = "db_user" # DB Username
password = "db_pass" # DB Password
host = "localhost" # DB Host
port = 5432 # DB Port
dbname = "orca_db" # Name of Database
pool_size = 5 # Number of connections to keep open
# Replica SQL data store credentials
[replica_database]
username = "db_user" # DB Username username = "db_user" # DB Username
password = "db_pass" # DB Password password = "db_pass" # DB Password
host = "localhost" # DB Host host = "localhost" # DB Host
@ -104,4 +113,4 @@ batch_size = 200 # Specifies the batch size the producer will push under a singl
# Drainer configuration, which handles draining raw SQL queries from Redis streams to the SQL database # Drainer configuration, which handles draining raw SQL queries from Redis streams to the SQL database
[drainer] [drainer]
stream_name = "DRAINER_STREAM" # Specifies the stream name to be used by the drainer stream_name = "DRAINER_STREAM" # Specifies the stream name to be used by the drainer
num_partitions = 64 # Specifies the number of partitions the stream will be divided into num_partitions = 64 # Specifies the number of partitions the stream will be divided into

View File

@ -16,7 +16,7 @@ level = "DEBUG" # What you see in your terminal.
[log.telemetry] [log.telemetry]
enabled = false # Whether tracing/telemetry is enabled. enabled = false # Whether tracing/telemetry is enabled.
[database] [master_database]
username = "db_user" username = "db_user"
password = "db_pass" password = "db_pass"
host = "pg" host = "pg"

View File

@ -13,6 +13,7 @@ default = []
kms = ["aws-config", "aws-sdk-kms"] kms = ["aws-config", "aws-sdk-kms"]
stripe = ["dep:serde_qs"] stripe = ["dep:serde_qs"]
sandbox = ["kms", "stripe"] sandbox = ["kms", "stripe"]
olap = []
production = [] production = []
kv_store = [] kv_store = []

View File

@ -9,7 +9,7 @@ request_body_limit = 16_384 # Post request body is limited to 16k.
# http_url = "" # http_url = ""
# https_url = "" # https_url = ""
[database] [master_database]
username = "none" username = "none"
password = "none" password = "none"
host = "localhost" host = "localhost"

View File

@ -23,7 +23,9 @@ pub struct Settings {
pub server: Server, pub server: Server,
pub proxy: Proxy, pub proxy: Proxy,
pub env: Env, pub env: Env,
pub database: Database, pub master_database: Database,
#[cfg(feature = "olap")]
pub replica_database: Database,
pub redis: Redis, pub redis: Redis,
pub log: Log, pub log: Log,
pub keys: Keys, pub keys: Keys,

View File

@ -22,7 +22,7 @@ pub trait IAddress {
#[async_trait::async_trait] #[async_trait::async_trait]
impl IAddress for Store { impl IAddress for Store {
async fn find_address(&self, address_id: &str) -> CustomResult<Address, errors::StorageError> { async fn find_address(&self, address_id: &str) -> CustomResult<Address, errors::StorageError> {
let conn = pg_connection(&self.pg_pool.conn).await; let conn = pg_connection(&self.master_pool.conn).await;
Address::find_by_address_id(&conn, address_id).await Address::find_by_address_id(&conn, address_id).await
} }
@ -31,7 +31,7 @@ impl IAddress for Store {
address_id: String, address_id: String,
address: AddressUpdate, address: AddressUpdate,
) -> CustomResult<Address, errors::StorageError> { ) -> CustomResult<Address, errors::StorageError> {
let conn = pg_connection(&self.pg_pool.conn).await; let conn = pg_connection(&self.master_pool.conn).await;
Address::update_by_address_id(&conn, address_id, address).await Address::update_by_address_id(&conn, address_id, address).await
} }
@ -39,7 +39,7 @@ impl IAddress for Store {
&self, &self,
address: AddressNew, address: AddressNew,
) -> CustomResult<Address, errors::StorageError> { ) -> CustomResult<Address, errors::StorageError> {
let conn = pg_connection(&self.pg_pool.conn).await; let conn = pg_connection(&self.master_pool.conn).await;
address.insert(&conn).await address.insert(&conn).await
} }
} }

View File

@ -21,12 +21,12 @@ pub trait IConfig {
#[async_trait::async_trait] #[async_trait::async_trait]
impl IConfig for Store { impl IConfig for Store {
async fn insert_config(&self, config: ConfigNew) -> CustomResult<Config, errors::StorageError> { async fn insert_config(&self, config: ConfigNew) -> CustomResult<Config, errors::StorageError> {
let conn = pg_connection(&self.pg_pool.conn).await; let conn = pg_connection(&self.master_pool.conn).await;
config.insert(&conn).await config.insert(&conn).await
} }
async fn find_config_by_key(&self, key: &str) -> CustomResult<Config, errors::StorageError> { async fn find_config_by_key(&self, key: &str) -> CustomResult<Config, errors::StorageError> {
let conn = pg_connection(&self.pg_pool.conn).await; let conn = pg_connection(&self.master_pool.conn).await;
Config::find_by_key(&conn, key).await Config::find_by_key(&conn, key).await
} }
@ -35,7 +35,7 @@ impl IConfig for Store {
key: &str, key: &str,
config_update: ConfigUpdate, config_update: ConfigUpdate,
) -> CustomResult<Config, errors::StorageError> { ) -> CustomResult<Config, errors::StorageError> {
let conn = pg_connection(&self.pg_pool.conn).await; let conn = pg_connection(&self.master_pool.conn).await;
Config::update_by_key(&conn, key, config_update).await Config::update_by_key(&conn, key, config_update).await
} }
} }

View File

@ -30,7 +30,7 @@ impl IConnectorResponse for Store {
&self, &self,
connector_response: ConnectorResponseNew, connector_response: ConnectorResponseNew,
) -> CustomResult<ConnectorResponse, errors::StorageError> { ) -> CustomResult<ConnectorResponse, errors::StorageError> {
let conn = pg_connection(&self.pg_pool.conn).await; let conn = pg_connection(&self.master_pool.conn).await;
connector_response.insert(&conn).await connector_response.insert(&conn).await
} }
@ -40,7 +40,7 @@ impl IConnectorResponse for Store {
merchant_id: &str, merchant_id: &str,
txn_id: &str, txn_id: &str,
) -> CustomResult<ConnectorResponse, errors::StorageError> { ) -> CustomResult<ConnectorResponse, errors::StorageError> {
let conn = pg_connection(&self.pg_pool.conn).await; let conn = pg_connection(&self.master_pool.conn).await;
ConnectorResponse::find_by_payment_id_and_merchant_id_transaction_id( ConnectorResponse::find_by_payment_id_and_merchant_id_transaction_id(
&conn, &conn,
payment_id, payment_id,
@ -55,7 +55,7 @@ impl IConnectorResponse for Store {
this: ConnectorResponse, this: ConnectorResponse,
connector_response_update: ConnectorResponseUpdate, connector_response_update: ConnectorResponseUpdate,
) -> CustomResult<ConnectorResponse, errors::StorageError> { ) -> CustomResult<ConnectorResponse, errors::StorageError> {
let conn = pg_connection(&self.pg_pool.conn).await; let conn = pg_connection(&self.master_pool.conn).await;
this.update(&conn, connector_response_update).await this.update(&conn, connector_response_update).await
} }
} }

View File

@ -48,7 +48,7 @@ impl ICustomer for Store {
customer_id: &str, customer_id: &str,
merchant_id: &str, merchant_id: &str,
) -> CustomResult<Option<Customer>, errors::StorageError> { ) -> CustomResult<Option<Customer>, errors::StorageError> {
let conn = pg_connection(&self.pg_pool.conn).await; let conn = pg_connection(&self.master_pool.conn).await;
Customer::find_optional_by_customer_id_merchant_id(&conn, customer_id, merchant_id).await Customer::find_optional_by_customer_id_merchant_id(&conn, customer_id, merchant_id).await
} }
@ -58,7 +58,7 @@ impl ICustomer for Store {
merchant_id: String, merchant_id: String,
customer: CustomerUpdate, customer: CustomerUpdate,
) -> CustomResult<Customer, errors::StorageError> { ) -> CustomResult<Customer, errors::StorageError> {
let conn = pg_connection(&self.pg_pool.conn).await; let conn = pg_connection(&self.master_pool.conn).await;
Customer::update_by_customer_id_merchant_id(&conn, customer_id, merchant_id, customer).await Customer::update_by_customer_id_merchant_id(&conn, customer_id, merchant_id, customer).await
} }
@ -67,7 +67,7 @@ impl ICustomer for Store {
customer_id: &str, customer_id: &str,
merchant_id: &str, merchant_id: &str,
) -> CustomResult<Customer, errors::StorageError> { ) -> CustomResult<Customer, errors::StorageError> {
let conn = pg_connection(&self.pg_pool.conn).await; let conn = pg_connection(&self.master_pool.conn).await;
Customer::find_by_customer_id_merchant_id(&conn, customer_id, merchant_id).await Customer::find_by_customer_id_merchant_id(&conn, customer_id, merchant_id).await
} }
@ -75,7 +75,7 @@ impl ICustomer for Store {
&self, &self,
customer_data: CustomerNew, customer_data: CustomerNew,
) -> CustomResult<Customer, errors::StorageError> { ) -> CustomResult<Customer, errors::StorageError> {
let conn = pg_connection(&self.pg_pool.conn).await; let conn = pg_connection(&self.master_pool.conn).await;
customer_data.insert(&conn).await customer_data.insert(&conn).await
} }
@ -84,7 +84,7 @@ impl ICustomer for Store {
customer_id: &str, customer_id: &str,
merchant_id: &str, merchant_id: &str,
) -> CustomResult<bool, errors::StorageError> { ) -> CustomResult<bool, errors::StorageError> {
let conn = pg_connection(&self.pg_pool.conn).await; let conn = pg_connection(&self.master_pool.conn).await;
Customer::delete_by_customer_id_merchant_id(&conn, customer_id, merchant_id).await Customer::delete_by_customer_id_merchant_id(&conn, customer_id, merchant_id).await
} }
} }

View File

@ -13,7 +13,7 @@ pub trait IEvent {
#[async_trait::async_trait] #[async_trait::async_trait]
impl IEvent for Store { impl IEvent for Store {
async fn insert_event(&self, event: EventNew) -> CustomResult<Event, errors::StorageError> { async fn insert_event(&self, event: EventNew) -> CustomResult<Event, errors::StorageError> {
let conn = pg_connection(&self.pg_pool.conn).await; let conn = pg_connection(&self.master_pool.conn).await;
event.insert(&conn).await event.insert(&conn).await
} }
} }

View File

@ -24,7 +24,7 @@ impl ILockerMockUp for Store {
&self, &self,
card_id: &str, card_id: &str,
) -> CustomResult<LockerMockUp, errors::StorageError> { ) -> CustomResult<LockerMockUp, errors::StorageError> {
let conn = pg_connection(&self.pg_pool.conn).await; let conn = pg_connection(&self.master_pool.conn).await;
LockerMockUp::find_by_card_id(&conn, card_id).await LockerMockUp::find_by_card_id(&conn, card_id).await
} }
@ -32,7 +32,7 @@ impl ILockerMockUp for Store {
&self, &self,
new: LockerMockUpNew, new: LockerMockUpNew,
) -> CustomResult<LockerMockUp, errors::StorageError> { ) -> CustomResult<LockerMockUp, errors::StorageError> {
let conn = pg_connection(&self.pg_pool.conn).await; let conn = pg_connection(&self.master_pool.conn).await;
new.insert(&conn).await new.insert(&conn).await
} }
} }

View File

@ -39,7 +39,7 @@ impl IMandate for Store {
merchant_id: &str, merchant_id: &str,
mandate_id: &str, mandate_id: &str,
) -> CustomResult<Mandate, errors::StorageError> { ) -> CustomResult<Mandate, errors::StorageError> {
let conn = pg_connection(&self.pg_pool.conn).await; let conn = pg_connection(&self.master_pool.conn).await;
Mandate::find_by_merchant_id_mandate_id(&conn, merchant_id, mandate_id).await Mandate::find_by_merchant_id_mandate_id(&conn, merchant_id, mandate_id).await
} }
@ -48,7 +48,7 @@ impl IMandate for Store {
merchant_id: &str, merchant_id: &str,
customer_id: &str, customer_id: &str,
) -> CustomResult<Vec<Mandate>, errors::StorageError> { ) -> CustomResult<Vec<Mandate>, errors::StorageError> {
let conn = pg_connection(&self.pg_pool.conn).await; let conn = pg_connection(&self.master_pool.conn).await;
Mandate::find_by_merchant_id_customer_id(&conn, merchant_id, customer_id).await Mandate::find_by_merchant_id_customer_id(&conn, merchant_id, customer_id).await
} }
@ -58,7 +58,7 @@ impl IMandate for Store {
mandate_id: &str, mandate_id: &str,
mandate: MandateUpdate, mandate: MandateUpdate,
) -> CustomResult<Mandate, errors::StorageError> { ) -> CustomResult<Mandate, errors::StorageError> {
let conn = pg_connection(&self.pg_pool.conn).await; let conn = pg_connection(&self.master_pool.conn).await;
Mandate::update_by_merchant_id_mandate_id(&conn, merchant_id, mandate_id, mandate).await Mandate::update_by_merchant_id_mandate_id(&conn, merchant_id, mandate_id, mandate).await
} }
@ -66,7 +66,7 @@ impl IMandate for Store {
&self, &self,
mandate: MandateNew, mandate: MandateNew,
) -> CustomResult<Mandate, errors::StorageError> { ) -> CustomResult<Mandate, errors::StorageError> {
let conn = pg_connection(&self.pg_pool.conn).await; let conn = pg_connection(&self.master_pool.conn).await;
mandate.insert(&conn).await mandate.insert(&conn).await
} }
} }

View File

@ -45,7 +45,7 @@ impl IMerchantAccount for Store {
&self, &self,
merchant_account: MerchantAccountNew, merchant_account: MerchantAccountNew,
) -> CustomResult<MerchantAccount, errors::StorageError> { ) -> CustomResult<MerchantAccount, errors::StorageError> {
let conn = pg_connection(&self.pg_pool.conn).await; let conn = pg_connection(&self.master_pool.conn).await;
merchant_account.insert(&conn).await merchant_account.insert(&conn).await
} }
@ -53,7 +53,7 @@ impl IMerchantAccount for Store {
&self, &self,
merchant_id: &str, merchant_id: &str,
) -> CustomResult<MerchantAccount, errors::StorageError> { ) -> CustomResult<MerchantAccount, errors::StorageError> {
let conn = pg_connection(&self.pg_pool.conn).await; let conn = pg_connection(&self.master_pool.conn).await;
MerchantAccount::find_by_merchant_id(&conn, merchant_id).await MerchantAccount::find_by_merchant_id(&conn, merchant_id).await
} }
@ -62,7 +62,7 @@ impl IMerchantAccount for Store {
this: MerchantAccount, this: MerchantAccount,
merchant_account: MerchantAccountUpdate, merchant_account: MerchantAccountUpdate,
) -> CustomResult<MerchantAccount, errors::StorageError> { ) -> CustomResult<MerchantAccount, errors::StorageError> {
let conn = pg_connection(&self.pg_pool.conn).await; let conn = pg_connection(&self.master_pool.conn).await;
this.update(&conn, merchant_account).await this.update(&conn, merchant_account).await
} }
@ -70,7 +70,7 @@ impl IMerchantAccount for Store {
&self, &self,
api_key: &str, api_key: &str,
) -> CustomResult<MerchantAccount, errors::StorageError> { ) -> CustomResult<MerchantAccount, errors::StorageError> {
let conn = pg_connection(&self.pg_pool.conn).await; let conn = pg_connection(&self.master_pool.conn).await;
MerchantAccount::find_by_api_key(&conn, api_key).await MerchantAccount::find_by_api_key(&conn, api_key).await
} }
@ -78,7 +78,7 @@ impl IMerchantAccount for Store {
&self, &self,
publishable_key: &str, publishable_key: &str,
) -> CustomResult<MerchantAccount, errors::StorageError> { ) -> CustomResult<MerchantAccount, errors::StorageError> {
let conn = pg_connection(&self.pg_pool.conn).await; let conn = pg_connection(&self.master_pool.conn).await;
MerchantAccount::find_by_publishable_key(&conn, publishable_key).await MerchantAccount::find_by_publishable_key(&conn, publishable_key).await
} }
@ -86,7 +86,7 @@ impl IMerchantAccount for Store {
&self, &self,
merchant_id: &str, merchant_id: &str,
) -> CustomResult<bool, errors::StorageError> { ) -> CustomResult<bool, errors::StorageError> {
let conn = pg_connection(&self.pg_pool.conn).await; let conn = pg_connection(&self.master_pool.conn).await;
MerchantAccount::delete_by_merchant_id(&conn, merchant_id).await MerchantAccount::delete_by_merchant_id(&conn, merchant_id).await
} }
} }

View File

@ -51,7 +51,7 @@ impl IMerchantConnectorAccount for Store {
merchant_id: &str, merchant_id: &str,
connector: &str, connector: &str,
) -> CustomResult<MerchantConnectorAccount, errors::StorageError> { ) -> CustomResult<MerchantConnectorAccount, errors::StorageError> {
let conn = pg_connection(&self.pg_pool.conn).await; let conn = pg_connection(&self.master_pool.conn).await;
MerchantConnectorAccount::find_by_merchant_id_connector(&conn, merchant_id, connector).await MerchantConnectorAccount::find_by_merchant_id_connector(&conn, merchant_id, connector).await
} }
@ -60,7 +60,7 @@ impl IMerchantConnectorAccount for Store {
merchant_id: &str, merchant_id: &str,
merchant_connector_id: &i32, merchant_connector_id: &i32,
) -> CustomResult<MerchantConnectorAccount, errors::StorageError> { ) -> CustomResult<MerchantConnectorAccount, errors::StorageError> {
let conn = pg_connection(&self.pg_pool.conn).await; let conn = pg_connection(&self.master_pool.conn).await;
MerchantConnectorAccount::find_by_merchant_id_merchant_connector_id( MerchantConnectorAccount::find_by_merchant_id_merchant_connector_id(
&conn, &conn,
merchant_id, merchant_id,
@ -73,7 +73,7 @@ impl IMerchantConnectorAccount for Store {
&self, &self,
t: MerchantConnectorAccountNew, t: MerchantConnectorAccountNew,
) -> CustomResult<MerchantConnectorAccount, errors::StorageError> { ) -> CustomResult<MerchantConnectorAccount, errors::StorageError> {
let conn = pg_connection(&self.pg_pool.conn).await; let conn = pg_connection(&self.master_pool.conn).await;
t.insert(&conn).await t.insert(&conn).await
} }
@ -81,7 +81,7 @@ impl IMerchantConnectorAccount for Store {
&self, &self,
merchant_id: &str, merchant_id: &str,
) -> CustomResult<Vec<MerchantConnectorAccount>, errors::StorageError> { ) -> CustomResult<Vec<MerchantConnectorAccount>, errors::StorageError> {
let conn = pg_connection(&self.pg_pool.conn).await; let conn = pg_connection(&self.master_pool.conn).await;
MerchantConnectorAccount::find_by_merchant_id(&conn, merchant_id).await MerchantConnectorAccount::find_by_merchant_id(&conn, merchant_id).await
} }
@ -90,7 +90,7 @@ impl IMerchantConnectorAccount for Store {
this: MerchantConnectorAccount, this: MerchantConnectorAccount,
merchant_connector_account: MerchantConnectorAccountUpdate, merchant_connector_account: MerchantConnectorAccountUpdate,
) -> CustomResult<MerchantConnectorAccount, errors::StorageError> { ) -> CustomResult<MerchantConnectorAccount, errors::StorageError> {
let conn = pg_connection(&self.pg_pool.conn).await; let conn = pg_connection(&self.master_pool.conn).await;
this.update(&conn, merchant_connector_account).await this.update(&conn, merchant_connector_account).await
} }
@ -99,7 +99,7 @@ impl IMerchantConnectorAccount for Store {
merchant_id: &str, merchant_id: &str,
merchant_connector_id: &i32, merchant_connector_id: &i32,
) -> CustomResult<bool, errors::StorageError> { ) -> CustomResult<bool, errors::StorageError> {
let conn = pg_connection(&self.pg_pool.conn).await; let conn = pg_connection(&self.master_pool.conn).await;
MerchantConnectorAccount::delete_by_merchant_id_merchant_connector_id( MerchantConnectorAccount::delete_by_merchant_id_merchant_connector_id(
&conn, &conn,
merchant_id, merchant_id,

View File

@ -64,7 +64,7 @@ mod storage {
&self, &self,
payment_attempt: PaymentAttemptNew, payment_attempt: PaymentAttemptNew,
) -> CustomResult<PaymentAttempt, errors::StorageError> { ) -> CustomResult<PaymentAttempt, errors::StorageError> {
let conn = pg_connection(&self.pg_pool.conn).await; let conn = pg_connection(&self.master_pool.conn).await;
payment_attempt.insert(&conn).await payment_attempt.insert(&conn).await
} }
@ -73,7 +73,7 @@ mod storage {
this: PaymentAttempt, this: PaymentAttempt,
payment_attempt: PaymentAttemptUpdate, payment_attempt: PaymentAttemptUpdate,
) -> CustomResult<PaymentAttempt, errors::StorageError> { ) -> CustomResult<PaymentAttempt, errors::StorageError> {
let conn = pg_connection(&self.pg_pool.conn).await; let conn = pg_connection(&self.master_pool.conn).await;
this.update(&conn, payment_attempt).await this.update(&conn, payment_attempt).await
} }
@ -82,7 +82,7 @@ mod storage {
payment_id: &str, payment_id: &str,
merchant_id: &str, merchant_id: &str,
) -> CustomResult<PaymentAttempt, errors::StorageError> { ) -> CustomResult<PaymentAttempt, errors::StorageError> {
let conn = pg_connection(&self.pg_pool.conn).await; let conn = pg_connection(&self.master_pool.conn).await;
PaymentAttempt::find_by_payment_id_merchant_id(&conn, payment_id, merchant_id).await PaymentAttempt::find_by_payment_id_merchant_id(&conn, payment_id, merchant_id).await
} }
@ -92,7 +92,7 @@ mod storage {
payment_id: &str, payment_id: &str,
merchant_id: &str, merchant_id: &str,
) -> CustomResult<PaymentAttempt, errors::StorageError> { ) -> CustomResult<PaymentAttempt, errors::StorageError> {
let conn = pg_connection(&self.pg_pool.conn).await; let conn = pg_connection(&self.master_pool.conn).await;
PaymentAttempt::find_by_transaction_id_payment_id_merchant_id( PaymentAttempt::find_by_transaction_id_payment_id_merchant_id(
&conn, &conn,
transaction_id, transaction_id,
@ -107,7 +107,7 @@ mod storage {
payment_id: &str, payment_id: &str,
merchant_id: &str, merchant_id: &str,
) -> CustomResult<PaymentAttempt, errors::StorageError> { ) -> CustomResult<PaymentAttempt, errors::StorageError> {
let conn = pg_connection(&self.pg_pool.conn).await; let conn = pg_connection(&self.master_pool.conn).await;
PaymentAttempt::find_last_successful_attempt_by_payment_id_merchant_id( PaymentAttempt::find_last_successful_attempt_by_payment_id_merchant_id(
&conn, &conn,
payment_id, payment_id,
@ -121,7 +121,7 @@ mod storage {
merchant_id: &str, merchant_id: &str,
connector_txn_id: &str, connector_txn_id: &str,
) -> CustomResult<PaymentAttempt, errors::StorageError> { ) -> CustomResult<PaymentAttempt, errors::StorageError> {
let conn = pg_connection(&self.pg_pool.conn).await; let conn = pg_connection(&self.master_pool.conn).await;
// TODO: update logic to lookup all payment attempts for an intent // TODO: update logic to lookup all payment attempts for an intent
// and apply filter logic on top of them to get the desired one. // and apply filter logic on top of them to get the desired one.
PaymentAttempt::find_by_merchant_id_connector_txn_id( PaymentAttempt::find_by_merchant_id_connector_txn_id(
@ -137,7 +137,7 @@ mod storage {
merchant_id: &str, merchant_id: &str,
txn_id: &str, txn_id: &str,
) -> CustomResult<PaymentAttempt, errors::StorageError> { ) -> CustomResult<PaymentAttempt, errors::StorageError> {
let conn = pg_connection(&self.pg_pool.conn).await; let conn = pg_connection(&self.master_pool.conn).await;
PaymentAttempt::find_by_merchant_id_transaction_id(&conn, merchant_id, txn_id).await PaymentAttempt::find_by_merchant_id_transaction_id(&conn, merchant_id, txn_id).await
} }
@ -215,7 +215,7 @@ mod storage {
))) )))
.into_report(), .into_report(),
Ok(1) => { Ok(1) => {
let conn = pg_connection(&self.pg_pool.conn).await; let conn = pg_connection(&self.master_pool.conn).await;
let query = payment_attempt let query = payment_attempt
.insert(&conn) .insert(&conn)
.await .await
@ -268,7 +268,7 @@ mod storage {
.into_report() .into_report()
.change_context(errors::StorageError::KVError)?; .change_context(errors::StorageError::KVError)?;
let conn = pg_connection(&self.pg_pool.conn).await; let conn = pg_connection(&self.master_pool.conn).await;
let query = this let query = this
.update(&conn, payment_attempt) .update(&conn, payment_attempt)
.await .await

View File

@ -92,7 +92,7 @@ mod storage {
))) )))
.into_report(), .into_report(),
Ok(1) => { Ok(1) => {
let conn = pg_connection(&self.pg_pool.conn).await; let conn = pg_connection(&self.master_pool.conn).await;
let query = new let query = new
.insert(&conn) .insert(&conn)
.await .await
@ -145,7 +145,7 @@ mod storage {
.into_report() .into_report()
.change_context(errors::StorageError::KVError)?; .change_context(errors::StorageError::KVError)?;
let conn = pg_connection(&self.pg_pool.conn).await; let conn = pg_connection(&self.master_pool.conn).await;
let query = this let query = this
.update(&conn, payment_intent) .update(&conn, payment_intent)
.await .await
@ -220,7 +220,7 @@ mod storage {
&self, &self,
new: PaymentIntentNew, new: PaymentIntentNew,
) -> CustomResult<PaymentIntent, errors::StorageError> { ) -> CustomResult<PaymentIntent, errors::StorageError> {
let conn = pg_connection(&self.pg_pool.conn).await; let conn = pg_connection(&self.master_pool.conn).await;
new.insert(&conn).await new.insert(&conn).await
} }
@ -229,7 +229,7 @@ mod storage {
this: PaymentIntent, this: PaymentIntent,
payment_intent: PaymentIntentUpdate, payment_intent: PaymentIntentUpdate,
) -> CustomResult<PaymentIntent, errors::StorageError> { ) -> CustomResult<PaymentIntent, errors::StorageError> {
let conn = pg_connection(&self.pg_pool.conn).await; let conn = pg_connection(&self.master_pool.conn).await;
this.update(&conn, payment_intent).await this.update(&conn, payment_intent).await
} }
@ -238,7 +238,7 @@ mod storage {
payment_id: &str, payment_id: &str,
merchant_id: &str, merchant_id: &str,
) -> CustomResult<PaymentIntent, errors::StorageError> { ) -> CustomResult<PaymentIntent, errors::StorageError> {
let conn = pg_connection(&self.pg_pool.conn).await; let conn = pg_connection(&self.master_pool.conn).await;
PaymentIntent::find_by_payment_id_merchant_id(&conn, payment_id, merchant_id).await PaymentIntent::find_by_payment_id_merchant_id(&conn, payment_id, merchant_id).await
} }
@ -247,7 +247,7 @@ mod storage {
merchant_id: &str, merchant_id: &str,
pc: &api::PaymentListConstraints, pc: &api::PaymentListConstraints,
) -> CustomResult<Vec<PaymentIntent>, errors::StorageError> { ) -> CustomResult<Vec<PaymentIntent>, errors::StorageError> {
let conn = pg_connection(&self.pg_pool.conn).await; let conn = pg_connection(&self.master_pool.conn).await;
PaymentIntent::filter_by_constraints(&conn, merchant_id, pc).await PaymentIntent::filter_by_constraints(&conn, merchant_id, pc).await
} }
} }

View File

@ -36,7 +36,7 @@ impl IPaymentMethod for Store {
&self, &self,
payment_method_id: &str, payment_method_id: &str,
) -> CustomResult<PaymentMethod, errors::StorageError> { ) -> CustomResult<PaymentMethod, errors::StorageError> {
let conn = pg_connection(&self.pg_pool.conn).await; let conn = pg_connection(&self.master_pool.conn).await;
PaymentMethod::find_by_payment_method_id(&conn, payment_method_id).await PaymentMethod::find_by_payment_method_id(&conn, payment_method_id).await
} }
@ -44,7 +44,7 @@ impl IPaymentMethod for Store {
&self, &self,
m: PaymentMethodNew, m: PaymentMethodNew,
) -> CustomResult<PaymentMethod, errors::StorageError> { ) -> CustomResult<PaymentMethod, errors::StorageError> {
let conn = pg_connection(&self.pg_pool.conn).await; let conn = pg_connection(&self.master_pool.conn).await;
m.insert(&conn).await m.insert(&conn).await
} }
@ -53,7 +53,7 @@ impl IPaymentMethod for Store {
customer_id: &str, customer_id: &str,
merchant_id: &str, merchant_id: &str,
) -> CustomResult<Vec<PaymentMethod>, errors::StorageError> { ) -> CustomResult<Vec<PaymentMethod>, errors::StorageError> {
let conn = pg_connection(&self.pg_pool.conn).await; let conn = pg_connection(&self.master_pool.conn).await;
PaymentMethod::find_by_customer_id_merchant_id(&conn, customer_id, merchant_id).await PaymentMethod::find_by_customer_id_merchant_id(&conn, customer_id, merchant_id).await
} }
@ -62,7 +62,7 @@ impl IPaymentMethod for Store {
merchant_id: &str, merchant_id: &str,
payment_method_id: &str, payment_method_id: &str,
) -> CustomResult<PaymentMethod, errors::StorageError> { ) -> CustomResult<PaymentMethod, errors::StorageError> {
let conn = pg_connection(&self.pg_pool.conn).await; let conn = pg_connection(&self.master_pool.conn).await;
PaymentMethod::delete_by_merchant_id_payment_method_id( PaymentMethod::delete_by_merchant_id_payment_method_id(
&conn, &conn,
merchant_id, merchant_id,

View File

@ -57,7 +57,7 @@ impl IProcessTracker for Store {
&self, &self,
id: &str, id: &str,
) -> CustomResult<Option<ProcessTracker>, errors::StorageError> { ) -> CustomResult<Option<ProcessTracker>, errors::StorageError> {
let conn = pg_connection(&self.pg_pool.conn).await; let conn = pg_connection(&self.master_pool.conn).await;
ProcessTracker::find_process_by_id(&conn, id).await ProcessTracker::find_process_by_id(&conn, id).await
} }
@ -66,7 +66,7 @@ impl IProcessTracker for Store {
ids: Vec<String>, ids: Vec<String>,
schedule_time: PrimitiveDateTime, schedule_time: PrimitiveDateTime,
) -> CustomResult<usize, errors::StorageError> { ) -> CustomResult<usize, errors::StorageError> {
let conn = pg_connection(&self.pg_pool.conn).await; let conn = pg_connection(&self.master_pool.conn).await;
ProcessTracker::reinitialize_limbo_processes(&conn, ids, schedule_time).await ProcessTracker::reinitialize_limbo_processes(&conn, ids, schedule_time).await
} }
@ -77,7 +77,7 @@ impl IProcessTracker for Store {
status: enums::ProcessTrackerStatus, status: enums::ProcessTrackerStatus,
limit: Option<i64>, limit: Option<i64>,
) -> CustomResult<Vec<ProcessTracker>, errors::StorageError> { ) -> CustomResult<Vec<ProcessTracker>, errors::StorageError> {
let conn = pg_connection(&self.pg_pool.conn).await; let conn = pg_connection(&self.master_pool.conn).await;
ProcessTracker::find_processes_by_time_status( ProcessTracker::find_processes_by_time_status(
&conn, &conn,
time_lower_limit, time_lower_limit,
@ -92,7 +92,7 @@ impl IProcessTracker for Store {
&self, &self,
new: ProcessTrackerNew, new: ProcessTrackerNew,
) -> CustomResult<ProcessTracker, errors::StorageError> { ) -> CustomResult<ProcessTracker, errors::StorageError> {
let conn = pg_connection(&self.pg_pool.conn).await; let conn = pg_connection(&self.master_pool.conn).await;
new.insert_process(&conn).await new.insert_process(&conn).await
} }
@ -101,7 +101,7 @@ impl IProcessTracker for Store {
this: ProcessTracker, this: ProcessTracker,
process: ProcessTrackerUpdate, process: ProcessTrackerUpdate,
) -> CustomResult<ProcessTracker, errors::StorageError> { ) -> CustomResult<ProcessTracker, errors::StorageError> {
let conn = pg_connection(&self.pg_pool.conn).await; let conn = pg_connection(&self.master_pool.conn).await;
this.update(&conn, process).await this.update(&conn, process).await
} }
@ -110,7 +110,7 @@ impl IProcessTracker for Store {
this: ProcessTracker, this: ProcessTracker,
process: ProcessTrackerUpdate, process: ProcessTrackerUpdate,
) -> CustomResult<ProcessTracker, errors::StorageError> { ) -> CustomResult<ProcessTracker, errors::StorageError> {
let conn = pg_connection(&self.pg_pool.conn).await; let conn = pg_connection(&self.master_pool.conn).await;
this.update(&conn, process).await this.update(&conn, process).await
} }
@ -119,7 +119,7 @@ impl IProcessTracker for Store {
task_ids: Vec<String>, task_ids: Vec<String>,
task_update: ProcessTrackerUpdate, task_update: ProcessTrackerUpdate,
) -> CustomResult<Vec<ProcessTracker>, errors::StorageError> { ) -> CustomResult<Vec<ProcessTracker>, errors::StorageError> {
let conn = pg_connection(&self.pg_pool.conn).await; let conn = pg_connection(&self.master_pool.conn).await;
ProcessTracker::update_process_status_by_ids(&conn, task_ids, task_update).await ProcessTracker::update_process_status_by_ids(&conn, task_ids, task_update).await
} }
} }

View File

@ -54,13 +54,13 @@ impl IRefund for Store {
internal_reference_id: &str, internal_reference_id: &str,
merchant_id: &str, merchant_id: &str,
) -> CustomResult<Refund, errors::StorageError> { ) -> CustomResult<Refund, errors::StorageError> {
let conn = pg_connection(&self.pg_pool.conn).await; let conn = pg_connection(&self.master_pool.conn).await;
Refund::find_by_internal_reference_id_merchant_id(&conn, internal_reference_id, merchant_id) Refund::find_by_internal_reference_id_merchant_id(&conn, internal_reference_id, merchant_id)
.await .await
} }
async fn insert_refund(&self, new: RefundNew) -> CustomResult<Refund, errors::StorageError> { async fn insert_refund(&self, new: RefundNew) -> CustomResult<Refund, errors::StorageError> {
let conn = pg_connection(&self.pg_pool.conn).await; let conn = pg_connection(&self.master_pool.conn).await;
new.insert(&conn).await new.insert(&conn).await
} }
async fn find_refund_by_merchant_id_transaction_id( async fn find_refund_by_merchant_id_transaction_id(
@ -68,7 +68,7 @@ impl IRefund for Store {
merchant_id: &str, merchant_id: &str,
txn_id: &str, txn_id: &str,
) -> CustomResult<Vec<Refund>, errors::StorageError> { ) -> CustomResult<Vec<Refund>, errors::StorageError> {
let conn = pg_connection(&self.pg_pool.conn).await; let conn = pg_connection(&self.master_pool.conn).await;
Refund::find_by_merchant_id_transaction_id(&conn, merchant_id, txn_id).await Refund::find_by_merchant_id_transaction_id(&conn, merchant_id, txn_id).await
} }
@ -77,7 +77,7 @@ impl IRefund for Store {
this: Refund, this: Refund,
refund: RefundUpdate, refund: RefundUpdate,
) -> CustomResult<Refund, errors::StorageError> { ) -> CustomResult<Refund, errors::StorageError> {
let conn = pg_connection(&self.pg_pool.conn).await; let conn = pg_connection(&self.master_pool.conn).await;
this.update(&conn, refund).await this.update(&conn, refund).await
} }
@ -86,7 +86,7 @@ impl IRefund for Store {
merchant_id: &str, merchant_id: &str,
refund_id: &str, refund_id: &str,
) -> CustomResult<Refund, errors::StorageError> { ) -> CustomResult<Refund, errors::StorageError> {
let conn = pg_connection(&self.pg_pool.conn).await; let conn = pg_connection(&self.master_pool.conn).await;
Refund::find_by_merchant_id_refund_id(&conn, merchant_id, refund_id).await Refund::find_by_merchant_id_refund_id(&conn, merchant_id, refund_id).await
} }
@ -96,7 +96,7 @@ impl IRefund for Store {
// merchant_id: &str, // merchant_id: &str,
// refund_id: &str, // refund_id: &str,
// ) -> CustomResult<Refund, errors::StorageError> { // ) -> CustomResult<Refund, errors::StorageError> {
// let conn = pg_connection(&self.pg_pool.conn).await; // let conn = pg_connection(&self.master_pool.conn).await;
// Refund::find_by_payment_id_merchant_id_refund_id(&conn, payment_id, merchant_id, refund_id) // Refund::find_by_payment_id_merchant_id_refund_id(&conn, payment_id, merchant_id, refund_id)
// .await // .await
// } // }
@ -106,7 +106,7 @@ impl IRefund for Store {
payment_id: &str, payment_id: &str,
merchant_id: &str, merchant_id: &str,
) -> CustomResult<Vec<Refund>, errors::StorageError> { ) -> CustomResult<Vec<Refund>, errors::StorageError> {
let conn = pg_connection(&self.pg_pool.conn).await; let conn = pg_connection(&self.master_pool.conn).await;
Refund::find_by_payment_id_merchant_id(&conn, payment_id, merchant_id).await Refund::find_by_payment_id_merchant_id(&conn, payment_id, merchant_id).await
} }
} }

View File

@ -34,7 +34,7 @@ impl ITempCard for Store {
&self, &self,
address: TempCardNew, address: TempCardNew,
) -> CustomResult<TempCard, errors::StorageError> { ) -> CustomResult<TempCard, errors::StorageError> {
let conn = pg_connection(&self.pg_pool.conn).await; let conn = pg_connection(&self.master_pool.conn).await;
address.insert(&conn).await address.insert(&conn).await
} }
@ -42,7 +42,7 @@ impl ITempCard for Store {
&self, &self,
transaction_id: &str, transaction_id: &str,
) -> CustomResult<Option<TempCard>, errors::StorageError> { ) -> CustomResult<Option<TempCard>, errors::StorageError> {
let conn = pg_connection(&self.pg_pool.conn).await; let conn = pg_connection(&self.master_pool.conn).await;
TempCard::find_by_transaction_id(&conn, transaction_id).await TempCard::find_by_transaction_id(&conn, transaction_id).await
} }
@ -50,7 +50,7 @@ impl ITempCard for Store {
&self, &self,
card: TempCard, card: TempCard,
) -> CustomResult<TempCard, errors::StorageError> { ) -> CustomResult<TempCard, errors::StorageError> {
let conn = pg_connection(&self.pg_pool.conn).await; let conn = pg_connection(&self.master_pool.conn).await;
TempCard::insert_with_token(card, &conn).await TempCard::insert_with_token(card, &conn).await
} }
@ -58,7 +58,7 @@ impl ITempCard for Store {
&self, &self,
token: &i32, token: &i32,
) -> CustomResult<TempCard, errors::StorageError> { ) -> CustomResult<TempCard, errors::StorageError> {
let conn = pg_connection(&self.pg_pool.conn).await; let conn = pg_connection(&self.master_pool.conn).await;
TempCard::find_by_token(&conn, token).await TempCard::find_by_token(&conn, token).await
} }
} }

View File

@ -9,7 +9,9 @@ pub use self::{api::*, encryption::*};
#[derive(Clone)] #[derive(Clone)]
pub struct Store { pub struct Store {
pub pg_pool: crate::db::SqlDb, pub master_pool: crate::db::SqlDb,
#[cfg(feature = "olap")]
pub replica_pool: crate::db::SqlDb,
pub redis_conn: Arc<crate::services::redis::RedisConnectionPool>, pub redis_conn: Arc<crate::services::redis::RedisConnectionPool>,
#[cfg(feature = "kv_store")] #[cfg(feature = "kv_store")]
pub(crate) config: StoreConfig, pub(crate) config: StoreConfig,
@ -25,7 +27,9 @@ pub(crate) struct StoreConfig {
impl Store { impl Store {
pub async fn new(config: &crate::configs::settings::Settings) -> Self { pub async fn new(config: &crate::configs::settings::Settings) -> Self {
Self { Self {
pg_pool: crate::db::SqlDb::new(&config.database).await, master_pool: crate::db::SqlDb::new(&config.master_database).await,
#[cfg(feature = "olap")]
replica_pool: crate::db::SqlDb::new(&config.replica_database).await,
redis_conn: Arc::new(crate::connection::redis_connection(config).await), redis_conn: Arc::new(crate::connection::redis_connection(config).await),
#[cfg(feature = "kv_store")] #[cfg(feature = "kv_store")]
config: StoreConfig { config: StoreConfig {