mirror of
https://github.com/juspay/hyperswitch.git
synced 2025-10-29 00:49:42 +08:00
refactor(storage): Add a separate crate to represent store implementations (#1853)
This commit is contained in:
1030
Cargo.lock
generated
1030
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
@ -96,6 +96,7 @@ redis_interface = { version = "0.1.0", path = "../redis_interface" }
|
||||
router_derive = { version = "0.1.0", path = "../router_derive" }
|
||||
router_env = { version = "0.1.0", path = "../router_env", features = ["log_extra_implicit_fields", "log_custom_entries_to_extra"] }
|
||||
diesel_models = { version = "0.1.0", path = "../diesel_models", features = ["kv_store"] }
|
||||
storage_impl = { version = "0.1.0", path = "../storage_impl"}
|
||||
|
||||
[target.'cfg(not(target_os = "windows"))'.dependencies]
|
||||
signal-hook-tokio = { version = "0.3.1", features = ["futures-v0_3"] }
|
||||
|
||||
@ -44,3 +44,23 @@ impl KmsDecrypt for settings::ActiveKmsSecrets {
|
||||
Ok(self)
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl KmsDecrypt for settings::Database {
|
||||
type Output = storage_impl::config::Database;
|
||||
|
||||
async fn decrypt_inner(
|
||||
mut self,
|
||||
kms_client: &KmsClient,
|
||||
) -> CustomResult<Self::Output, KmsError> {
|
||||
Ok(storage_impl::config::Database {
|
||||
host: self.host,
|
||||
port: self.port,
|
||||
dbname: self.dbname,
|
||||
username: self.username,
|
||||
password: self.password.decrypt_inner(kms_client).await?.into(),
|
||||
pool_size: self.pool_size,
|
||||
connection_timeout: self.connection_timeout,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@ -432,6 +432,21 @@ pub struct Database {
|
||||
pub connection_timeout: u64,
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "kms"))]
|
||||
impl Into<storage_impl::config::Database> for Database {
|
||||
fn into(self) -> storage_impl::config::Database {
|
||||
storage_impl::config::Database {
|
||||
username: self.username,
|
||||
password: self.password,
|
||||
host: self.host,
|
||||
port: self.port,
|
||||
dbname: self.dbname,
|
||||
pool_size: self.pool_size,
|
||||
connection_timeout: self.connection_timeout,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize, Clone)]
|
||||
#[serde(default)]
|
||||
pub struct SupportedConnectors {
|
||||
|
||||
@ -84,7 +84,7 @@ pub async fn pg_connection_read(
|
||||
> {
|
||||
// If only OLAP is enabled get replica pool.
|
||||
#[cfg(all(feature = "olap", not(feature = "oltp")))]
|
||||
let pool = &store.replica_pool;
|
||||
let pool = &store.diesel_store.replica_pool;
|
||||
|
||||
// If either one of these are true we need to get master pool.
|
||||
// 1. Only OLTP is enabled.
|
||||
@ -95,7 +95,7 @@ pub async fn pg_connection_read(
|
||||
all(feature = "olap", feature = "oltp"),
|
||||
all(not(feature = "olap"), not(feature = "oltp"))
|
||||
))]
|
||||
let pool = &store.master_pool;
|
||||
let pool = &store.diesel_store.master_pool;
|
||||
|
||||
pool.get()
|
||||
.await
|
||||
@ -110,7 +110,7 @@ pub async fn pg_connection_write(
|
||||
errors::StorageError,
|
||||
> {
|
||||
// Since all writes should happen to master DB only choose master DB.
|
||||
let pool = &store.master_pool;
|
||||
let pool = &store.diesel_store.master_pool;
|
||||
|
||||
pool.get()
|
||||
.await
|
||||
|
||||
@ -11,6 +11,7 @@ use external_services::kms::{self, decrypt::KmsDecrypt};
|
||||
#[cfg(not(feature = "kms"))]
|
||||
use masking::PeekInterface;
|
||||
use redis_interface::{errors as redis_errors, PubsubInterface, RedisValue};
|
||||
use storage_impl::diesel as diesel_impl;
|
||||
use tokio::sync::oneshot;
|
||||
|
||||
pub use self::{api::*, encryption::*};
|
||||
@ -18,7 +19,6 @@ use crate::{
|
||||
async_spawn,
|
||||
cache::{CacheKind, ACCOUNTS_CACHE, CONFIG_CACHE},
|
||||
configs::settings,
|
||||
connection::{diesel_make_pg_pool, PgPool},
|
||||
consts,
|
||||
core::errors,
|
||||
};
|
||||
@ -129,9 +129,10 @@ impl RedisConnInterface for Store {
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct Store {
|
||||
pub master_pool: PgPool,
|
||||
#[cfg(not(feature = "olap"))]
|
||||
pub diesel_store: diesel_impl::store::Store,
|
||||
#[cfg(feature = "olap")]
|
||||
pub replica_pool: PgPool,
|
||||
pub diesel_store: diesel_impl::store::ReplicaStore,
|
||||
pub redis_conn: Arc<redis_interface::RedisConnectionPool>,
|
||||
#[cfg(feature = "kv_store")]
|
||||
pub(crate) config: StoreConfig,
|
||||
@ -178,20 +179,43 @@ impl Store {
|
||||
)
|
||||
.await;
|
||||
|
||||
#[allow(clippy::expect_used)]
|
||||
Self {
|
||||
master_pool: diesel_make_pg_pool(
|
||||
&config.master_database,
|
||||
test_transaction,
|
||||
#[cfg(not(feature = "olap"))]
|
||||
diesel_store: diesel_impl::store::Store::new(
|
||||
#[cfg(not(feature = "kms"))]
|
||||
&config.master_database.clone().into(),
|
||||
#[cfg(feature = "kms")]
|
||||
kms_client,
|
||||
&config
|
||||
.master_database
|
||||
.clone()
|
||||
.decrypt_inner(kms_client)
|
||||
.await
|
||||
.expect("Failed to decrypt master database"),
|
||||
test_transaction,
|
||||
)
|
||||
.await,
|
||||
#[cfg(feature = "olap")]
|
||||
replica_pool: diesel_make_pg_pool(
|
||||
&config.replica_database,
|
||||
test_transaction,
|
||||
diesel_store: diesel_impl::store::ReplicaStore::new(
|
||||
#[cfg(not(feature = "kms"))]
|
||||
&config.master_database.clone().into(),
|
||||
#[cfg(feature = "kms")]
|
||||
kms_client,
|
||||
&config
|
||||
.master_database
|
||||
.clone()
|
||||
.decrypt_inner(kms_client)
|
||||
.await
|
||||
.expect("Failed to decrypt master database"),
|
||||
#[cfg(not(feature = "kms"))]
|
||||
&config.replica_database.clone().into(),
|
||||
#[cfg(feature = "kms")]
|
||||
&config
|
||||
.replica_database
|
||||
.clone()
|
||||
.decrypt_inner(kms_client)
|
||||
.await
|
||||
.expect("Failed to decrypt replica database"),
|
||||
test_transaction,
|
||||
)
|
||||
.await,
|
||||
redis_conn,
|
||||
|
||||
20
crates/storage_impl/Cargo.toml
Normal file
20
crates/storage_impl/Cargo.toml
Normal file
@ -0,0 +1,20 @@
|
||||
[package]
|
||||
name = "storage_impl"
|
||||
description = "Storage backend implementations for data structures in router"
|
||||
version = "0.1.0"
|
||||
edition.workspace = true
|
||||
rust-version.workspace = true
|
||||
readme = "README.md"
|
||||
license.workspace = true
|
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
# First Party dependencies
|
||||
masking = { version = "0.1.0", path = "../masking" }
|
||||
|
||||
# Third party crates
|
||||
bb8 = "0.8.1"
|
||||
diesel = { version = "2.1.0", default-features = false, features = ["postgres"] }
|
||||
async-bb8-diesel = { git = "https://github.com/oxidecomputer/async-bb8-diesel", rev = "be3d9bce50051d8c0e0c06078e8066cc27db3001" }
|
||||
async-trait = "0.1.72"
|
||||
3
crates/storage_impl/README.md
Normal file
3
crates/storage_impl/README.md
Normal file
@ -0,0 +1,3 @@
|
||||
# Storage implementations
|
||||
|
||||
Storage backend implementations for data structures & objects.
|
||||
12
crates/storage_impl/src/config.rs
Normal file
12
crates/storage_impl/src/config.rs
Normal file
@ -0,0 +1,12 @@
|
||||
use masking::Secret;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Database {
|
||||
pub username: String,
|
||||
pub password: Secret<String>,
|
||||
pub host: String,
|
||||
pub port: u16,
|
||||
pub dbname: String,
|
||||
pub pool_size: u32,
|
||||
pub connection_timeout: u64,
|
||||
}
|
||||
1
crates/storage_impl/src/diesel.rs
Normal file
1
crates/storage_impl/src/diesel.rs
Normal file
@ -0,0 +1 @@
|
||||
pub mod store;
|
||||
84
crates/storage_impl/src/diesel/store.rs
Normal file
84
crates/storage_impl/src/diesel/store.rs
Normal file
@ -0,0 +1,84 @@
|
||||
use async_bb8_diesel::{AsyncConnection, ConnectionError};
|
||||
use bb8::CustomizeConnection;
|
||||
use diesel::PgConnection;
|
||||
use masking::PeekInterface;
|
||||
|
||||
use crate::config::Database;
|
||||
|
||||
pub type PgPool = bb8::Pool<async_bb8_diesel::ConnectionManager<PgConnection>>;
|
||||
pub type PgPooledConn = async_bb8_diesel::Connection<PgConnection>;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct Store {
|
||||
pub master_pool: PgPool,
|
||||
}
|
||||
|
||||
impl Store {
|
||||
pub async fn new(config: &Database, test_transaction: bool) -> Self {
|
||||
Self {
|
||||
master_pool: diesel_make_pg_pool(config, test_transaction).await,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct ReplicaStore {
|
||||
pub master_pool: PgPool,
|
||||
pub replica_pool: PgPool,
|
||||
}
|
||||
|
||||
impl ReplicaStore {
|
||||
pub async fn new(
|
||||
master_config: &Database,
|
||||
replica_config: &Database,
|
||||
test_transaction: bool,
|
||||
) -> Self {
|
||||
let master_pool = diesel_make_pg_pool(master_config, test_transaction).await;
|
||||
let replica_pool = diesel_make_pg_pool(replica_config, test_transaction).await;
|
||||
Self {
|
||||
master_pool,
|
||||
replica_pool,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(clippy::expect_used)]
|
||||
pub async fn diesel_make_pg_pool(database: &Database, test_transaction: bool) -> PgPool {
|
||||
let database_url = format!(
|
||||
"postgres://{}:{}@{}:{}/{}",
|
||||
database.username,
|
||||
database.password.peek(),
|
||||
database.host,
|
||||
database.port,
|
||||
database.dbname
|
||||
);
|
||||
let manager = async_bb8_diesel::ConnectionManager::<PgConnection>::new(database_url);
|
||||
let mut pool = bb8::Pool::builder()
|
||||
.max_size(database.pool_size)
|
||||
.connection_timeout(std::time::Duration::from_secs(database.connection_timeout));
|
||||
|
||||
if test_transaction {
|
||||
pool = pool.connection_customizer(Box::new(TestTransaction));
|
||||
}
|
||||
|
||||
pool.build(manager)
|
||||
.await
|
||||
.expect("Failed to create PostgreSQL connection pool")
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct TestTransaction;
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl CustomizeConnection<PgPooledConn, ConnectionError> for TestTransaction {
|
||||
#[allow(clippy::unwrap_used)]
|
||||
async fn on_acquire(&self, conn: &mut PgPooledConn) -> Result<(), ConnectionError> {
|
||||
use diesel::Connection;
|
||||
|
||||
conn.run(|conn| {
|
||||
conn.begin_test_transaction().unwrap();
|
||||
Ok(())
|
||||
})
|
||||
.await
|
||||
}
|
||||
}
|
||||
2
crates/storage_impl/src/lib.rs
Normal file
2
crates/storage_impl/src/lib.rs
Normal file
@ -0,0 +1,2 @@
|
||||
pub mod config;
|
||||
pub mod diesel;
|
||||
0
crates/storage_impl/src/redis.rs
Normal file
0
crates/storage_impl/src/redis.rs
Normal file
@ -5921,7 +5921,7 @@
|
||||
"description": "Will be used to expire client secret after certain amount of time to be supplied in seconds\n(900) for 15 mins",
|
||||
"example": 900,
|
||||
"nullable": true,
|
||||
"minimum": 0.0
|
||||
"minimum": 0
|
||||
},
|
||||
"organization_id": {
|
||||
"type": "string",
|
||||
@ -6215,7 +6215,7 @@
|
||||
"format": "int32",
|
||||
"description": "Will be used to expire client secret after certain amount of time to be supplied in seconds\n(900) for 15 mins",
|
||||
"nullable": true,
|
||||
"minimum": 0.0
|
||||
"minimum": 0
|
||||
}
|
||||
}
|
||||
},
|
||||
@ -6936,13 +6936,13 @@
|
||||
"type": "integer",
|
||||
"format": "int64",
|
||||
"description": "Timestamp at which session is requested",
|
||||
"minimum": 0.0
|
||||
"minimum": 0
|
||||
},
|
||||
"expires_at": {
|
||||
"type": "integer",
|
||||
"format": "int64",
|
||||
"description": "Timestamp at which session expires",
|
||||
"minimum": 0.0
|
||||
"minimum": 0
|
||||
},
|
||||
"merchant_session_identifier": {
|
||||
"type": "string",
|
||||
@ -6976,7 +6976,7 @@
|
||||
"type": "integer",
|
||||
"format": "int32",
|
||||
"description": "The number of retries to get the session response",
|
||||
"minimum": 0.0
|
||||
"minimum": 0
|
||||
},
|
||||
"psp_id": {
|
||||
"type": "string",
|
||||
@ -7030,7 +7030,7 @@
|
||||
"format": "int32",
|
||||
"description": "The quantity of the product to be purchased",
|
||||
"example": 1,
|
||||
"minimum": 0.0
|
||||
"minimum": 0
|
||||
}
|
||||
}
|
||||
},
|
||||
@ -7053,7 +7053,7 @@
|
||||
"format": "int32",
|
||||
"description": "The quantity of the product to be purchased",
|
||||
"example": 1,
|
||||
"minimum": 0.0
|
||||
"minimum": 0
|
||||
},
|
||||
"amount": {
|
||||
"type": "integer",
|
||||
@ -7365,6 +7365,7 @@
|
||||
"$ref": "#/components/schemas/AuthenticationType"
|
||||
}
|
||||
],
|
||||
"default": "three_ds",
|
||||
"nullable": true
|
||||
},
|
||||
"cancellation_reason": {
|
||||
@ -7553,7 +7554,7 @@
|
||||
"size": {
|
||||
"type": "integer",
|
||||
"description": "The number of payments included in the list",
|
||||
"minimum": 0.0
|
||||
"minimum": 0
|
||||
},
|
||||
"data": {
|
||||
"type": "array",
|
||||
@ -8204,7 +8205,7 @@
|
||||
"description": "The payment amount. Amount for the payment in lowest denomination of the currency. (i.e) in cents for USD denomination, in paisa for INR denomination etc.,",
|
||||
"example": 6540,
|
||||
"nullable": true,
|
||||
"minimum": 0.0
|
||||
"minimum": 0
|
||||
},
|
||||
"routing": {
|
||||
"allOf": [
|
||||
@ -8338,6 +8339,7 @@
|
||||
"$ref": "#/components/schemas/AuthenticationType"
|
||||
}
|
||||
],
|
||||
"default": "three_ds",
|
||||
"nullable": true
|
||||
},
|
||||
"payment_method_data": {
|
||||
@ -8538,7 +8540,7 @@
|
||||
"description": "The payment amount. Amount for the payment in lowest denomination of the currency. (i.e) in cents for USD denomination, in paisa for INR denomination etc.,",
|
||||
"example": 6540,
|
||||
"nullable": true,
|
||||
"minimum": 0.0
|
||||
"minimum": 0
|
||||
},
|
||||
"routing": {
|
||||
"allOf": [
|
||||
@ -8672,6 +8674,7 @@
|
||||
"$ref": "#/components/schemas/AuthenticationType"
|
||||
}
|
||||
],
|
||||
"default": "three_ds",
|
||||
"nullable": true
|
||||
},
|
||||
"payment_method_data": {
|
||||
@ -8875,7 +8878,12 @@
|
||||
"maxLength": 255
|
||||
},
|
||||
"status": {
|
||||
"allOf": [
|
||||
{
|
||||
"$ref": "#/components/schemas/IntentStatus"
|
||||
}
|
||||
],
|
||||
"default": "requires_confirmation"
|
||||
},
|
||||
"amount": {
|
||||
"type": "integer",
|
||||
@ -8889,7 +8897,7 @@
|
||||
"description": "The maximum amount that could be captured from the payment",
|
||||
"example": 6540,
|
||||
"nullable": true,
|
||||
"minimum": 100.0
|
||||
"minimum": 100
|
||||
},
|
||||
"amount_received": {
|
||||
"type": "integer",
|
||||
@ -8897,7 +8905,7 @@
|
||||
"description": "The amount which is already captured from the payment",
|
||||
"example": 6540,
|
||||
"nullable": true,
|
||||
"minimum": 100.0
|
||||
"minimum": 100
|
||||
},
|
||||
"connector": {
|
||||
"type": "string",
|
||||
@ -9077,6 +9085,7 @@
|
||||
"$ref": "#/components/schemas/AuthenticationType"
|
||||
}
|
||||
],
|
||||
"default": "three_ds",
|
||||
"nullable": true
|
||||
},
|
||||
"statement_descriptor_name": {
|
||||
@ -9962,7 +9971,7 @@
|
||||
"size": {
|
||||
"type": "integer",
|
||||
"description": "The number of refunds included in the list",
|
||||
"minimum": 0.0
|
||||
"minimum": 0
|
||||
},
|
||||
"data": {
|
||||
"type": "array",
|
||||
@ -10007,7 +10016,7 @@
|
||||
"description": "Total amount for which the refund is to be initiated. Amount for the payment in lowest denomination of the currency. (i.e) in cents for USD denomination, in paisa for INR denomination etc., If not provided, this will default to the full payment amount",
|
||||
"example": 6540,
|
||||
"nullable": true,
|
||||
"minimum": 100.0
|
||||
"minimum": 100
|
||||
},
|
||||
"reason": {
|
||||
"type": "string",
|
||||
@ -10022,6 +10031,7 @@
|
||||
"$ref": "#/components/schemas/RefundType"
|
||||
}
|
||||
],
|
||||
"default": "Instant",
|
||||
"nullable": true
|
||||
},
|
||||
"metadata": {
|
||||
|
||||
Reference in New Issue
Block a user