From 0ba4ccfc8b38a918a56eab66715005b4c448172b Mon Sep 17 00:00:00 2001 From: Hrithikesh <61539176+hrithikesh026@users.noreply.github.com> Date: Fri, 14 Feb 2025 18:11:06 +0530 Subject: [PATCH] feat(core): introduce accounts schema for accounts related tables (#7113) Co-authored-by: hyperswitch-bot[bot] <148525504+hyperswitch-bot[bot]@users.noreply.github.com> --- config/config.example.toml | 1 + config/deployments/env_specific.toml | 1 + config/development.toml | 1 + config/docker_compose.toml | 3 +- crates/diesel_models/src/organization.rs | 4 +- crates/drainer/src/settings.rs | 3 + crates/router/src/configs/settings.rs | 108 +++++++++++++++++- crates/router/src/connection.rs | 40 +++++++ crates/router/src/core/admin.rs | 16 +-- crates/router/src/core/user.rs | 6 +- crates/router/src/core/user_role.rs | 2 +- crates/router/src/db.rs | 24 +++- crates/router/src/db/kafka_store.rs | 7 +- crates/router/src/db/organization.rs | 6 +- crates/router/src/routes/app.rs | 48 +++++--- crates/router/src/routes/payments.rs | 12 +- crates/router/src/types/domain/user.rs | 2 +- crates/router/src/utils/user/theme.rs | 2 +- crates/storage_impl/src/config.rs | 4 +- crates/storage_impl/src/database/store.rs | 44 +++++++ crates/storage_impl/src/lib.rs | 16 +++ loadtest/config/development.toml | 3 +- .../down.sql | 3 + .../up.sql | 3 + .../down.sql | 3 - .../up.sql | 2 - 26 files changed, 308 insertions(+), 56 deletions(-) create mode 100644 v2_migrations/2024-08-28-081722_drop_not_null_constraints_on_v1_columns/down.sql create mode 100644 v2_migrations/2024-08-28-081722_drop_not_null_constraints_on_v1_columns/up.sql diff --git a/config/config.example.toml b/config/config.example.toml index 64b382eeba..17b0d5b8c7 100644 --- a/config/config.example.toml +++ b/config/config.example.toml @@ -778,6 +778,7 @@ global_tenant = { tenant_id = "global", schema = "public", redis_key_prefix = "" [multitenancy.tenants.public] base_url = "http://localhost:8080" # URL of the tenant schema = "public" # Postgres db schema +accounts_schema = "public" redis_key_prefix = "" # Redis key distinguisher clickhouse_database = "default" # Clickhouse database diff --git a/config/deployments/env_specific.toml b/config/deployments/env_specific.toml index 4e5fa3dba4..157e0aa3a4 100644 --- a/config/deployments/env_specific.toml +++ b/config/deployments/env_specific.toml @@ -308,6 +308,7 @@ global_tenant = { tenant_id = "global", schema = "public", redis_key_prefix = "" [multitenancy.tenants.public] base_url = "http://localhost:8080" schema = "public" +accounts_schema = "public" redis_key_prefix = "" clickhouse_database = "default" diff --git a/config/development.toml b/config/development.toml index 7c4af97971..153c8a996d 100644 --- a/config/development.toml +++ b/config/development.toml @@ -860,6 +860,7 @@ global_tenant = { tenant_id = "global", schema = "public", redis_key_prefix = "g [multitenancy.tenants.public] base_url = "http://localhost:8080" schema = "public" +accounts_schema = "public" redis_key_prefix = "" clickhouse_database = "default" diff --git a/config/docker_compose.toml b/config/docker_compose.toml index 48df4c2c93..48581fbea0 100644 --- a/config/docker_compose.toml +++ b/config/docker_compose.toml @@ -649,7 +649,8 @@ global_tenant = { tenant_id = "global", schema = "public", redis_key_prefix = "" [multitenancy.tenants.public] base_url = "http://localhost:8080" -schema = "public" +schema = "public" +accounts_schema = "public" redis_key_prefix = "" clickhouse_database = "default" diff --git a/crates/diesel_models/src/organization.rs b/crates/diesel_models/src/organization.rs index 6a3cad24e1..753f1cf3c3 100644 --- a/crates/diesel_models/src/organization.rs +++ b/crates/diesel_models/src/organization.rs @@ -100,7 +100,7 @@ impl Organization { pub struct OrganizationNew { org_id: id_type::OrganizationId, org_name: Option, - id: Option, + id: id_type::OrganizationId, organization_name: Option, pub organization_details: Option, pub metadata: Option, @@ -126,7 +126,7 @@ impl OrganizationNew { Self { org_id: id.clone(), org_name: organization_name.clone(), - id: Some(id), + id, organization_name, organization_details: None, metadata: None, diff --git a/crates/drainer/src/settings.rs b/crates/drainer/src/settings.rs index 9b6c88b346..ea926f32af 100644 --- a/crates/drainer/src/settings.rs +++ b/crates/drainer/src/settings.rs @@ -146,6 +146,7 @@ impl<'de> Deserialize<'de> for TenantConfig { struct Inner { base_url: String, schema: String, + accounts_schema: String, redis_key_prefix: String, clickhouse_database: String, } @@ -162,6 +163,7 @@ impl<'de> Deserialize<'de> for TenantConfig { tenant_id: key, base_url: value.base_url, schema: value.schema, + accounts_schema: value.accounts_schema, redis_key_prefix: value.redis_key_prefix, clickhouse_database: value.clickhouse_database, }, @@ -177,6 +179,7 @@ pub struct Tenant { pub tenant_id: id_type::TenantId, pub base_url: String, pub schema: String, + pub accounts_schema: String, pub redis_key_prefix: String, pub clickhouse_database: String, } diff --git a/crates/router/src/configs/settings.rs b/crates/router/src/configs/settings.rs index 5445cb8697..91ef78a68d 100644 --- a/crates/router/src/configs/settings.rs +++ b/crates/router/src/configs/settings.rs @@ -1,6 +1,7 @@ use std::{ collections::{HashMap, HashSet}, path::PathBuf, + sync::Arc, }; #[cfg(feature = "olap")] @@ -32,11 +33,14 @@ use serde::Deserialize; use storage_impl::config::QueueStrategy; #[cfg(feature = "olap")] -use crate::analytics::AnalyticsConfig; +use crate::analytics::{AnalyticsConfig, AnalyticsProvider}; use crate::{ + configs, core::errors::{ApplicationError, ApplicationResult}, env::{self, Env}, events::EventsConfig, + routes::app, + AppState, }; pub const REQUIRED_FIELDS_CONFIG_FILE: &str = "payment_required_fields_v2.toml"; @@ -171,11 +175,96 @@ pub struct DecisionConfig { #[derive(Debug, Clone, Default)] pub struct TenantConfig(pub HashMap); +impl TenantConfig { + /// # Panics + /// + /// Panics if Failed to create event handler + pub async fn get_store_interface_map( + &self, + storage_impl: &app::StorageImpl, + conf: &configs::Settings, + cache_store: Arc, + testable: bool, + ) -> HashMap> { + #[allow(clippy::expect_used)] + let event_handler = conf + .events + .get_event_handler() + .await + .expect("Failed to create event handler"); + futures::future::join_all(self.0.iter().map(|(tenant_name, tenant)| async { + let store = AppState::get_store_interface( + storage_impl, + &event_handler, + conf, + tenant, + cache_store.clone(), + testable, + ) + .await + .get_storage_interface(); + (tenant_name.clone(), store) + })) + .await + .into_iter() + .collect() + } + /// # Panics + /// + /// Panics if Failed to create event handler + pub async fn get_accounts_store_interface_map( + &self, + storage_impl: &app::StorageImpl, + conf: &configs::Settings, + cache_store: Arc, + testable: bool, + ) -> HashMap> { + #[allow(clippy::expect_used)] + let event_handler = conf + .events + .get_event_handler() + .await + .expect("Failed to create event handler"); + futures::future::join_all(self.0.iter().map(|(tenant_name, tenant)| async { + let store = AppState::get_store_interface( + storage_impl, + &event_handler, + conf, + tenant, + cache_store.clone(), + testable, + ) + .await + .get_accounts_storage_interface(); + (tenant_name.clone(), store) + })) + .await + .into_iter() + .collect() + } + #[cfg(feature = "olap")] + pub async fn get_pools_map( + &self, + analytics_config: &AnalyticsConfig, + ) -> HashMap { + futures::future::join_all(self.0.iter().map(|(tenant_name, tenant)| async { + ( + tenant_name.clone(), + AnalyticsProvider::from_conf(analytics_config, tenant).await, + ) + })) + .await + .into_iter() + .collect() + } +} + #[derive(Debug, Clone)] pub struct Tenant { pub tenant_id: id_type::TenantId, pub base_url: String, pub schema: String, + pub accounts_schema: String, pub redis_key_prefix: String, pub clickhouse_database: String, pub user: TenantUserConfig, @@ -187,6 +276,12 @@ pub struct TenantUserConfig { } impl storage_impl::config::TenantConfig for Tenant { + fn get_tenant_id(&self) -> &id_type::TenantId { + &self.tenant_id + } + fn get_accounts_schema(&self) -> &str { + self.accounts_schema.as_str() + } fn get_schema(&self) -> &str { self.schema.as_str() } @@ -198,6 +293,7 @@ impl storage_impl::config::TenantConfig for Tenant { } } +// Todo: Global tenant should not be part of tenant config(https://github.com/juspay/hyperswitch/issues/7237) #[derive(Debug, Deserialize, Clone)] pub struct GlobalTenant { #[serde(default = "id_type::TenantId::get_default_global_tenant_id")] @@ -206,8 +302,14 @@ pub struct GlobalTenant { pub redis_key_prefix: String, pub clickhouse_database: String, } - +// Todo: Global tenant should not be part of tenant config impl storage_impl::config::TenantConfig for GlobalTenant { + fn get_tenant_id(&self) -> &id_type::TenantId { + &self.tenant_id + } + fn get_accounts_schema(&self) -> &str { + self.schema.as_str() + } fn get_schema(&self) -> &str { self.schema.as_str() } @@ -1169,6 +1271,7 @@ impl<'de> Deserialize<'de> for TenantConfig { struct Inner { base_url: String, schema: String, + accounts_schema: String, redis_key_prefix: String, clickhouse_database: String, user: TenantUserConfig, @@ -1186,6 +1289,7 @@ impl<'de> Deserialize<'de> for TenantConfig { tenant_id: key, base_url: value.base_url, schema: value.schema, + accounts_schema: value.accounts_schema, redis_key_prefix: value.redis_key_prefix, clickhouse_database: value.clickhouse_database, user: value.user, diff --git a/crates/router/src/connection.rs b/crates/router/src/connection.rs index 009149b722..818dab4411 100644 --- a/crates/router/src/connection.rs +++ b/crates/router/src/connection.rs @@ -48,6 +48,32 @@ pub async fn pg_connection_read( .change_context(storage_errors::StorageError::DatabaseConnectionError) } +pub async fn pg_accounts_connection_read( + store: &T, +) -> errors::CustomResult< + PooledConnection<'_, async_bb8_diesel::ConnectionManager>, + storage_errors::StorageError, +> { + // If only OLAP is enabled get replica pool. + #[cfg(all(feature = "olap", not(feature = "oltp")))] + let pool = store.get_accounts_replica_pool(); + + // If either one of these are true we need to get master pool. + // 1. Only OLTP is enabled. + // 2. Both OLAP and OLTP is enabled. + // 3. Both OLAP and OLTP is disabled. + #[cfg(any( + all(not(feature = "olap"), feature = "oltp"), + all(feature = "olap", feature = "oltp"), + all(not(feature = "olap"), not(feature = "oltp")) + ))] + let pool = store.get_accounts_master_pool(); + + pool.get() + .await + .change_context(storage_errors::StorageError::DatabaseConnectionError) +} + pub async fn pg_connection_write( store: &T, ) -> errors::CustomResult< @@ -61,3 +87,17 @@ pub async fn pg_connection_write( .await .change_context(storage_errors::StorageError::DatabaseConnectionError) } + +pub async fn pg_accounts_connection_write( + store: &T, +) -> errors::CustomResult< + PooledConnection<'_, async_bb8_diesel::ConnectionManager>, + storage_errors::StorageError, +> { + // Since all writes should happen to master DB only choose master DB. + let pool = store.get_accounts_master_pool(); + + pool.get() + .await + .change_context(storage_errors::StorageError::DatabaseConnectionError) +} diff --git a/crates/router/src/core/admin.rs b/crates/router/src/core/admin.rs index 0e915dd770..958dbf7438 100644 --- a/crates/router/src/core/admin.rs +++ b/crates/router/src/core/admin.rs @@ -34,7 +34,7 @@ use crate::{ pm_auth::helpers::PaymentAuthConnectorDataExt, routing, utils as core_utils, }, - db::StorageInterface, + db::{AccountsStorageInterface, StorageInterface}, routes::{metrics, SessionState}, services::{ self, @@ -120,7 +120,7 @@ pub async fn create_organization( ) -> RouterResponse { let db_organization = ForeignFrom::foreign_from(req); state - .store + .accounts_store .insert_organization(db_organization) .await .to_duplicate_response(errors::ApiErrorResponse::GenericDuplicateError { @@ -143,7 +143,7 @@ pub async fn update_organization( metadata: req.metadata, }; state - .store + .accounts_store .update_organization_by_org_id(&org_id.organization_id, organization_update) .await .to_not_found_response(errors::ApiErrorResponse::GenericNotFoundError { @@ -165,7 +165,7 @@ pub async fn get_organization( #[cfg(all(feature = "v1", feature = "olap"))] { CreateOrValidateOrganization::new(Some(org_id.organization_id)) - .create_or_validate(state.store.as_ref()) + .create_or_validate(state.accounts_store.as_ref()) .await .map(ForeignFrom::foreign_from) .map(service_api::ApplicationResponse::Json) @@ -173,7 +173,7 @@ pub async fn get_organization( #[cfg(all(feature = "v2", feature = "olap"))] { CreateOrValidateOrganization::new(org_id.organization_id) - .create_or_validate(state.store.as_ref()) + .create_or_validate(state.accounts_store.as_ref()) .await .map(ForeignFrom::foreign_from) .map(service_api::ApplicationResponse::Json) @@ -283,7 +283,7 @@ impl MerchantAccountCreateBridge for api::MerchantAccountCreate { key_store: domain::MerchantKeyStore, identifier: &id_type::MerchantId, ) -> RouterResult { - let db = &*state.store; + let db = &*state.accounts_store; let publishable_key = create_merchant_publishable_key(); let primary_business_details = self.get_primary_details_as_value().change_context( @@ -454,7 +454,7 @@ impl CreateOrValidateOrganization { /// Apply the action, whether to create the organization or validate the given organization_id async fn create_or_validate( &self, - db: &dyn StorageInterface, + db: &dyn AccountsStorageInterface, ) -> RouterResult { match self { #[cfg(feature = "v1")] @@ -609,7 +609,7 @@ impl MerchantAccountCreateBridge for api::MerchantAccountCreate { identifier: &id_type::MerchantId, ) -> RouterResult { let publishable_key = create_merchant_publishable_key(); - let db = &*state.store; + let db = &*state.accounts_store; let metadata = self.get_metadata_as_secret().change_context( errors::ApiErrorResponse::InvalidDataValue { diff --git a/crates/router/src/core/user.rs b/crates/router/src/core/user.rs index ead0d4cd57..4f19f040bd 100644 --- a/crates/router/src/core/user.rs +++ b/crates/router/src/core/user.rs @@ -1462,7 +1462,7 @@ pub async fn create_org_merchant_for_user( ) -> UserResponse<()> { let db_organization = ForeignFrom::foreign_from(req.clone()); let org: diesel_models::organization::Organization = state - .store + .accounts_store .insert_organization(db_organization) .await .change_context(UserErrors::InternalServerError)?; @@ -1548,7 +1548,7 @@ pub async fn list_user_roles_details( .collect::>(); let org_name = state - .store + .accounts_store .find_organization_by_org_id(&user_from_token.org_id) .await .change_context(UserErrors::InternalServerError) @@ -2837,7 +2837,7 @@ pub async fn list_orgs_for_user( let resp = futures::future::try_join_all( orgs.iter() - .map(|org_id| state.store.find_organization_by_org_id(org_id)), + .map(|org_id| state.accounts_store.find_organization_by_org_id(org_id)), ) .await .change_context(UserErrors::InternalServerError)? diff --git a/crates/router/src/core/user_role.rs b/crates/router/src/core/user_role.rs index 2b09a4ae27..0ece98da13 100644 --- a/crates/router/src/core/user_role.rs +++ b/crates/router/src/core/user_role.rs @@ -988,7 +988,7 @@ pub async fn list_invitations_for_user( let org_name_map = futures::future::try_join_all(org_ids.into_iter().map(|org_id| async { let org_name = state - .store + .accounts_store .find_organization_by_org_id(&org_id) .await .change_context(UserErrors::InternalServerError)? diff --git a/crates/router/src/db.rs b/crates/router/src/db.rs index 17da4902ba..70bc533f9a 100644 --- a/crates/router/src/db.rs +++ b/crates/router/src/db.rs @@ -126,7 +126,6 @@ pub trait StorageInterface: + RedisConnInterface + RequestIdStore + business_profile::ProfileInterface - + OrganizationInterface + routing_algorithm::RoutingAlgorithmInterface + gsm::GsmInterface + unified_translations::UnifiedTranslationsInterface @@ -159,9 +158,18 @@ pub trait GlobalStorageInterface: { } -pub trait CommonStorageInterface: StorageInterface + GlobalStorageInterface { +#[async_trait::async_trait] +pub trait AccountsStorageInterface: + Send + Sync + dyn_clone::DynClone + OrganizationInterface + 'static +{ +} + +pub trait CommonStorageInterface: + StorageInterface + GlobalStorageInterface + AccountsStorageInterface +{ fn get_storage_interface(&self) -> Box; fn get_global_storage_interface(&self) -> Box; + fn get_accounts_storage_interface(&self) -> Box; } pub trait MasterKeyInterface { @@ -198,6 +206,8 @@ impl StorageInterface for Store { #[async_trait::async_trait] impl GlobalStorageInterface for Store {} +impl AccountsStorageInterface for Store {} + #[async_trait::async_trait] impl StorageInterface for MockDb { fn get_scheduler_db(&self) -> Box { @@ -212,6 +222,8 @@ impl StorageInterface for MockDb { #[async_trait::async_trait] impl GlobalStorageInterface for MockDb {} +impl AccountsStorageInterface for MockDb {} + impl CommonStorageInterface for MockDb { fn get_global_storage_interface(&self) -> Box { Box::new(self.clone()) @@ -219,6 +231,10 @@ impl CommonStorageInterface for MockDb { fn get_storage_interface(&self) -> Box { Box::new(self.clone()) } + + fn get_accounts_storage_interface(&self) -> Box { + Box::new(self.clone()) + } } impl CommonStorageInterface for Store { @@ -228,6 +244,9 @@ impl CommonStorageInterface for Store { fn get_storage_interface(&self) -> Box { Box::new(self.clone()) } + fn get_accounts_storage_interface(&self) -> Box { + Box::new(self.clone()) + } } pub trait RequestIdStore { @@ -267,6 +286,7 @@ where dyn_clone::clone_trait_object!(StorageInterface); dyn_clone::clone_trait_object!(GlobalStorageInterface); +dyn_clone::clone_trait_object!(AccountsStorageInterface); impl RequestIdStore for KafkaStore { fn add_request_id(&mut self, request_id: String) { diff --git a/crates/router/src/db/kafka_store.rs b/crates/router/src/db/kafka_store.rs index 5d961317a1..d4a1fee092 100644 --- a/crates/router/src/db/kafka_store.rs +++ b/crates/router/src/db/kafka_store.rs @@ -80,7 +80,8 @@ use crate::{ reverse_lookup::ReverseLookupInterface, routing_algorithm::RoutingAlgorithmInterface, unified_translations::UnifiedTranslationsInterface, - CommonStorageInterface, GlobalStorageInterface, MasterKeyInterface, StorageInterface, + AccountsStorageInterface, CommonStorageInterface, GlobalStorageInterface, + MasterKeyInterface, StorageInterface, }, services::{kafka::KafkaProducer, Store}, types::{domain, storage, AccessToken}, @@ -3095,6 +3096,7 @@ impl StorageInterface for KafkaStore { } impl GlobalStorageInterface for KafkaStore {} +impl AccountsStorageInterface for KafkaStore {} impl CommonStorageInterface for KafkaStore { fn get_storage_interface(&self) -> Box { @@ -3103,6 +3105,9 @@ impl CommonStorageInterface for KafkaStore { fn get_global_storage_interface(&self) -> Box { Box::new(self.clone()) } + fn get_accounts_storage_interface(&self) -> Box { + Box::new(self.clone()) + } } #[async_trait::async_trait] diff --git a/crates/router/src/db/organization.rs b/crates/router/src/db/organization.rs index bc79fdc0a0..b1dc58f040 100644 --- a/crates/router/src/db/organization.rs +++ b/crates/router/src/db/organization.rs @@ -31,7 +31,7 @@ impl OrganizationInterface for Store { &self, organization: storage::OrganizationNew, ) -> CustomResult { - let conn = connection::pg_connection_write(self).await?; + let conn = connection::pg_accounts_connection_write(self).await?; organization .insert(&conn) .await @@ -43,7 +43,7 @@ impl OrganizationInterface for Store { &self, org_id: &id_type::OrganizationId, ) -> CustomResult { - let conn = connection::pg_connection_read(self).await?; + let conn = connection::pg_accounts_connection_read(self).await?; storage::Organization::find_by_org_id(&conn, org_id.to_owned()) .await .map_err(|error| report!(errors::StorageError::from(error))) @@ -55,7 +55,7 @@ impl OrganizationInterface for Store { org_id: &id_type::OrganizationId, update: storage::OrganizationUpdate, ) -> CustomResult { - let conn = connection::pg_connection_write(self).await?; + let conn = connection::pg_accounts_connection_write(self).await?; storage::Organization::update_by_org_id(&conn, org_id.to_owned(), update) .await diff --git a/crates/router/src/routes/app.rs b/crates/router/src/routes/app.rs index c5e1f8a108..ee1e06af93 100644 --- a/crates/router/src/routes/app.rs +++ b/crates/router/src/routes/app.rs @@ -78,7 +78,10 @@ use crate::routes::fraud_check as frm_routes; use crate::routes::recon as recon_routes; pub use crate::{ configs::settings, - db::{CommonStorageInterface, GlobalStorageInterface, StorageImpl, StorageInterface}, + db::{ + AccountsStorageInterface, CommonStorageInterface, GlobalStorageInterface, StorageImpl, + StorageInterface, + }, events::EventsHandler, services::{get_cache_store, get_store}, }; @@ -97,6 +100,7 @@ pub struct SessionState { pub store: Box, /// Global store is used for global schema operations in tables like Users and Tenants pub global_store: Box, + pub accounts_store: Box, pub conf: Arc>, pub api_client: Box, pub event_handler: EventsHandler, @@ -203,6 +207,8 @@ impl SessionStateInfo for SessionState { pub struct AppState { pub flow_name: String, pub global_store: Box, + // TODO: use a separate schema for accounts_store + pub accounts_store: HashMap>, pub stores: HashMap>, pub conf: Arc>, pub event_handler: EventsHandler, @@ -338,9 +344,6 @@ impl AppState { .expect("Failed to create opensearch client"), ); - #[cfg(feature = "olap")] - let mut pools: HashMap = HashMap::new(); - let mut stores = HashMap::new(); #[allow(clippy::expect_used)] let cache_store = get_cache_store(&conf.clone(), shut_down_signal, testable) .await @@ -355,23 +358,27 @@ impl AppState { ) .await .get_global_storage_interface(); - for (tenant_name, tenant) in conf.clone().multitenancy.get_tenants() { - let store: Box = Self::get_store_interface( + #[cfg(feature = "olap")] + let pools = conf + .multitenancy + .tenants + .get_pools_map(conf.analytics.get_inner()) + .await; + let stores = conf + .multitenancy + .tenants + .get_store_interface_map(&storage_impl, &conf, Arc::clone(&cache_store), testable) + .await; + let accounts_store = conf + .multitenancy + .tenants + .get_accounts_store_interface_map( &storage_impl, - &event_handler, &conf, - tenant, Arc::clone(&cache_store), testable, ) - .await - .get_storage_interface(); - stores.insert(tenant_name.clone(), store); - #[cfg(feature = "olap")] - let pool = AnalyticsProvider::from_conf(conf.analytics.get_inner(), tenant).await; - #[cfg(feature = "olap")] - pools.insert(tenant_name.clone(), pool); - } + .await; #[cfg(feature = "email")] let email_client = Arc::new(create_email_client(&conf).await); @@ -385,6 +392,7 @@ impl AppState { flow_name: String::from("default"), stores, global_store, + accounts_store, conf: Arc::new(conf), #[cfg(feature = "email")] email_client, @@ -404,7 +412,10 @@ impl AppState { .await } - async fn get_store_interface( + /// # Panics + /// + /// Panics if Failed to create store + pub async fn get_store_interface( storage_impl: &StorageImpl, event_handler: &EventsHandler, conf: &Settings, @@ -421,7 +432,7 @@ impl AppState { .await .expect("Failed to create store"), kafka_client.clone(), - TenantID(tenant.get_schema().to_string()), + TenantID(tenant.get_tenant_id().get_string_repr().to_owned()), tenant, ) .await, @@ -471,6 +482,7 @@ impl AppState { Ok(SessionState { store: self.stores.get(tenant).ok_or_else(err)?.clone(), global_store: self.global_store.clone(), + accounts_store: self.accounts_store.get(tenant).ok_or_else(err)?.clone(), conf: Arc::clone(&self.conf), api_client: self.api_client.clone(), event_handler, diff --git a/crates/router/src/routes/payments.rs b/crates/router/src/routes/payments.rs index a088f04b09..b55ab207f4 100644 --- a/crates/router/src/routes/payments.rs +++ b/crates/router/src/routes/payments.rs @@ -966,7 +966,7 @@ pub async fn payments_redirect_response( creds_identifier: None, }; let locking_action = payload.get_locking_input(flow.clone()); - api::server_wrap( + Box::pin(api::server_wrap( flow, state, &req, @@ -984,7 +984,7 @@ pub async fn payments_redirect_response( }, &auth::MerchantIdAuth(merchant_id), locking_action, - ) + )) .await } @@ -1016,7 +1016,7 @@ pub async fn payments_redirect_response_with_creds_identifier( }; let flow = Flow::PaymentsRedirect; let locking_action = payload.get_locking_input(flow.clone()); - api::server_wrap( + Box::pin(api::server_wrap( flow, state, &req, @@ -1034,7 +1034,7 @@ pub async fn payments_redirect_response_with_creds_identifier( }, &auth::MerchantIdAuth(merchant_id), locking_action, - ) + )) .await } @@ -1066,7 +1066,7 @@ pub async fn payments_complete_authorize_redirect( creds_identifier: None, }; let locking_action = payload.get_locking_input(flow.clone()); - api::server_wrap( + Box::pin(api::server_wrap( flow, state, &req, @@ -1085,7 +1085,7 @@ pub async fn payments_complete_authorize_redirect( }, &auth::MerchantIdAuth(merchant_id), locking_action, - ) + )) .await } diff --git a/crates/router/src/types/domain/user.rs b/crates/router/src/types/domain/user.rs index 569e7f9a99..1eb2305058 100644 --- a/crates/router/src/types/domain/user.rs +++ b/crates/router/src/types/domain/user.rs @@ -245,7 +245,7 @@ pub struct NewUserOrganization(diesel_org::OrganizationNew); impl NewUserOrganization { pub async fn insert_org_in_db(self, state: SessionState) -> UserResult { state - .store + .accounts_store .insert_organization(self.0) .await .map_err(|e| { diff --git a/crates/router/src/utils/user/theme.rs b/crates/router/src/utils/user/theme.rs index 9cc1c43462..0b9d4e2829 100644 --- a/crates/router/src/utils/user/theme.rs +++ b/crates/router/src/utils/user/theme.rs @@ -100,7 +100,7 @@ fn validate_tenant(state: &SessionState, tenant_id: &id_type::TenantId) -> UserR async fn validate_org(state: &SessionState, org_id: &id_type::OrganizationId) -> UserResult<()> { state - .store + .accounts_store .find_organization_by_org_id(org_id) .await .to_not_found_response(UserErrors::InvalidThemeLineage("org_id".to_string())) diff --git a/crates/storage_impl/src/config.rs b/crates/storage_impl/src/config.rs index ac68d1718a..857c23161e 100644 --- a/crates/storage_impl/src/config.rs +++ b/crates/storage_impl/src/config.rs @@ -1,4 +1,4 @@ -use common_utils::DbConnectionParams; +use common_utils::{id_type, DbConnectionParams}; use masking::Secret; #[derive(Debug, Clone, serde::Deserialize)] @@ -34,7 +34,9 @@ impl DbConnectionParams for Database { } pub trait TenantConfig: Send + Sync { + fn get_tenant_id(&self) -> &id_type::TenantId; fn get_schema(&self) -> &str; + fn get_accounts_schema(&self) -> &str; fn get_redis_key_prefix(&self) -> &str; fn get_clickhouse_database(&self) -> &str; } diff --git a/crates/storage_impl/src/database/store.rs b/crates/storage_impl/src/database/store.rs index 543cc47870..547be83e3a 100644 --- a/crates/storage_impl/src/database/store.rs +++ b/crates/storage_impl/src/database/store.rs @@ -20,11 +20,14 @@ pub trait DatabaseStore: Clone + Send + Sync { ) -> StorageResult; fn get_master_pool(&self) -> &PgPool; fn get_replica_pool(&self) -> &PgPool; + fn get_accounts_master_pool(&self) -> &PgPool; + fn get_accounts_replica_pool(&self) -> &PgPool; } #[derive(Debug, Clone)] pub struct Store { pub master_pool: PgPool, + pub accounts_pool: PgPool, } #[async_trait::async_trait] @@ -38,6 +41,12 @@ impl DatabaseStore for Store { Ok(Self { master_pool: diesel_make_pg_pool(&config, tenant_config.get_schema(), test_transaction) .await?, + accounts_pool: diesel_make_pg_pool( + &config, + tenant_config.get_accounts_schema(), + test_transaction, + ) + .await?, }) } @@ -48,12 +57,22 @@ impl DatabaseStore for Store { fn get_replica_pool(&self) -> &PgPool { &self.master_pool } + + fn get_accounts_master_pool(&self) -> &PgPool { + &self.accounts_pool + } + + fn get_accounts_replica_pool(&self) -> &PgPool { + &self.accounts_pool + } } #[derive(Debug, Clone)] pub struct ReplicaStore { pub master_pool: PgPool, pub replica_pool: PgPool, + pub accounts_master_pool: PgPool, + pub accounts_replica_pool: PgPool, } #[async_trait::async_trait] @@ -69,6 +88,13 @@ impl DatabaseStore for ReplicaStore { diesel_make_pg_pool(&master_config, tenant_config.get_schema(), test_transaction) .await .attach_printable("failed to create master pool")?; + let accounts_master_pool = diesel_make_pg_pool( + &master_config, + tenant_config.get_accounts_schema(), + test_transaction, + ) + .await + .attach_printable("failed to create accounts master pool")?; let replica_pool = diesel_make_pg_pool( &replica_config, tenant_config.get_schema(), @@ -76,9 +102,19 @@ impl DatabaseStore for ReplicaStore { ) .await .attach_printable("failed to create replica pool")?; + + let accounts_replica_pool = diesel_make_pg_pool( + &replica_config, + tenant_config.get_accounts_schema(), + test_transaction, + ) + .await + .attach_printable("failed to create accounts pool")?; Ok(Self { master_pool, replica_pool, + accounts_master_pool, + accounts_replica_pool, }) } @@ -89,6 +125,14 @@ impl DatabaseStore for ReplicaStore { fn get_replica_pool(&self) -> &PgPool { &self.replica_pool } + + fn get_accounts_master_pool(&self) -> &PgPool { + &self.accounts_master_pool + } + + fn get_accounts_replica_pool(&self) -> &PgPool { + &self.accounts_replica_pool + } } pub async fn diesel_make_pg_pool( diff --git a/crates/storage_impl/src/lib.rs b/crates/storage_impl/src/lib.rs index 8a32901044..137cd2a5d4 100644 --- a/crates/storage_impl/src/lib.rs +++ b/crates/storage_impl/src/lib.rs @@ -86,6 +86,14 @@ where fn get_replica_pool(&self) -> &PgPool { self.db_store.get_replica_pool() } + + fn get_accounts_master_pool(&self) -> &PgPool { + self.db_store.get_accounts_master_pool() + } + + fn get_accounts_replica_pool(&self) -> &PgPool { + self.db_store.get_accounts_replica_pool() + } } impl RedisConnInterface for RouterStore { @@ -203,6 +211,14 @@ where fn get_replica_pool(&self) -> &PgPool { self.router_store.get_replica_pool() } + + fn get_accounts_master_pool(&self) -> &PgPool { + self.router_store.get_accounts_master_pool() + } + + fn get_accounts_replica_pool(&self) -> &PgPool { + self.router_store.get_accounts_replica_pool() + } } impl RedisConnInterface for KVRouterStore { diff --git a/loadtest/config/development.toml b/loadtest/config/development.toml index 4b7d194b6f..ed2ea37307 100644 --- a/loadtest/config/development.toml +++ b/loadtest/config/development.toml @@ -410,7 +410,8 @@ global_tenant = { tenant_id = "global", schema = "public", redis_key_prefix = "" [multitenancy.tenants.public] base_url = "http://localhost:8080" -schema = "public" +schema = "public" +accounts_schema = "public" redis_key_prefix = "" clickhouse_database = "default" diff --git a/v2_migrations/2024-08-28-081722_drop_not_null_constraints_on_v1_columns/down.sql b/v2_migrations/2024-08-28-081722_drop_not_null_constraints_on_v1_columns/down.sql new file mode 100644 index 0000000000..46c0d151e1 --- /dev/null +++ b/v2_migrations/2024-08-28-081722_drop_not_null_constraints_on_v1_columns/down.sql @@ -0,0 +1,3 @@ +-- This file should undo anything in `up.sql` +ALTER TABLE organization ALTER COLUMN org_id SET NOT NULL; +ALTER TABLE organization ADD PRIMARY KEY (org_id); \ No newline at end of file diff --git a/v2_migrations/2024-08-28-081722_drop_not_null_constraints_on_v1_columns/up.sql b/v2_migrations/2024-08-28-081722_drop_not_null_constraints_on_v1_columns/up.sql new file mode 100644 index 0000000000..886cb0bf58 --- /dev/null +++ b/v2_migrations/2024-08-28-081722_drop_not_null_constraints_on_v1_columns/up.sql @@ -0,0 +1,3 @@ +-- Drop not null constraint on org_id in orgnaization table +ALTER TABLE organization DROP CONSTRAINT organization_pkey; +ALTER TABLE organization ALTER COLUMN org_id DROP NOT NULL; \ No newline at end of file diff --git a/v2_migrations/2024-08-28-081838_update_v2_primary_key_constraints/down.sql b/v2_migrations/2024-08-28-081838_update_v2_primary_key_constraints/down.sql index 661f7f294e..5903aea89f 100644 --- a/v2_migrations/2024-08-28-081838_update_v2_primary_key_constraints/down.sql +++ b/v2_migrations/2024-08-28-081838_update_v2_primary_key_constraints/down.sql @@ -6,9 +6,6 @@ WHERE org_id IS NULL; ALTER TABLE ORGANIZATION DROP CONSTRAINT organization_pkey_id; -ALTER TABLE ORGANIZATION -ADD CONSTRAINT organization_pkey PRIMARY KEY (org_id); - ALTER TABLE ORGANIZATION DROP CONSTRAINT organization_organization_name_key; -- back fill diff --git a/v2_migrations/2024-08-28-081838_update_v2_primary_key_constraints/up.sql b/v2_migrations/2024-08-28-081838_update_v2_primary_key_constraints/up.sql index 3b1fe9b46a..0f5bd32e10 100644 --- a/v2_migrations/2024-08-28-081838_update_v2_primary_key_constraints/up.sql +++ b/v2_migrations/2024-08-28-081838_update_v2_primary_key_constraints/up.sql @@ -11,8 +11,6 @@ WHERE organization_name IS NULL AND org_name IS NOT NULL; -- Alter queries for organization table -ALTER TABLE ORGANIZATION DROP CONSTRAINT organization_pkey; - ALTER TABLE ORGANIZATION ADD CONSTRAINT organization_pkey_id PRIMARY KEY (id);