mirror of
				https://github.com/juspay/hyperswitch.git
				synced 2025-10-31 10:06:32 +08:00 
			
		
		
		
	feat(multitenancy): add tenant_id as a field for data pipeline and support individual database for clickhouse (#4867)
Co-authored-by: hyperswitch-bot[bot] <148525504+hyperswitch-bot[bot]@users.noreply.github.com> Co-authored-by: Arun Raj M <jarnura47@gmail.com> Co-authored-by: Sampras Lopes <sampras.lopes@juspay.in>
This commit is contained in:
		| @ -641,6 +641,7 @@ sdk_eligible_payment_methods = "card" | |||||||
|  |  | ||||||
| [multitenancy] | [multitenancy] | ||||||
| enabled = false | enabled = false | ||||||
|  | global_tenant = { schema = "public", redis_key_prefix = "" } | ||||||
|  |  | ||||||
| [multitenancy.tenants] | [multitenancy.tenants] | ||||||
| public = { name = "hyperswitch", base_url = "http://localhost:8080", schema = "public", redis_key_prefix = ""} # schema -> Postgres db schema, redis_key_prefix -> redis key distinguisher, base_url -> url of the tenant  | public = { name = "hyperswitch", base_url = "http://localhost:8080", schema = "public", redis_key_prefix = "", clickhouse_database = "default"} # schema -> Postgres db schema, redis_key_prefix -> redis key distinguisher, base_url -> url of the tenant  | ||||||
| @ -256,6 +256,7 @@ region = "kms_region" # The AWS region used by the KMS SDK for decrypting data. | |||||||
|  |  | ||||||
| [multitenancy] | [multitenancy] | ||||||
| enabled = false | enabled = false | ||||||
|  | global_tenant = { schema = "public", redis_key_prefix = "" } | ||||||
|  |  | ||||||
| [multitenancy.tenants] | [multitenancy.tenants] | ||||||
| public = { name = "hyperswitch", base_url = "http://localhost:8080", schema = "public", redis_key_prefix = ""} | public = { name = "hyperswitch", base_url = "http://localhost:8080", schema = "public", redis_key_prefix = "", clickhouse_database = "default"} | ||||||
| @ -650,6 +650,7 @@ sdk_eligible_payment_methods = "card" | |||||||
|  |  | ||||||
| [multitenancy] | [multitenancy] | ||||||
| enabled = false | enabled = false | ||||||
|  | global_tenant = { schema = "public", redis_key_prefix = "" } | ||||||
|  |  | ||||||
| [multitenancy.tenants] | [multitenancy.tenants] | ||||||
| public = { name = "hyperswitch", base_url = "http://localhost:8080", schema = "public", redis_key_prefix = ""} | public = { name = "hyperswitch", base_url = "http://localhost:8080", schema = "public", redis_key_prefix = "", clickhouse_database = "default"} | ||||||
|  | |||||||
| @ -504,6 +504,7 @@ sdk_eligible_payment_methods = "card" | |||||||
|  |  | ||||||
| [multitenancy] | [multitenancy] | ||||||
| enabled = false | enabled = false | ||||||
|  | global_tenant = { schema = "public", redis_key_prefix = "" } | ||||||
|  |  | ||||||
| [multitenancy.tenants] | [multitenancy.tenants] | ||||||
| public = { name = "hyperswitch", base_url = "http://localhost:8080", schema = "public", redis_key_prefix = ""} | public = { name = "hyperswitch", base_url = "http://localhost:8080", schema = "public", redis_key_prefix = "", clickhouse_database = "default"} | ||||||
| @ -35,6 +35,7 @@ pub type ClickhouseResult<T> = error_stack::Result<T, ClickhouseError>; | |||||||
| #[derive(Clone, Debug)] | #[derive(Clone, Debug)] | ||||||
| pub struct ClickhouseClient { | pub struct ClickhouseClient { | ||||||
|     pub config: Arc<ClickhouseConfig>, |     pub config: Arc<ClickhouseConfig>, | ||||||
|  |     pub database: String, | ||||||
| } | } | ||||||
|  |  | ||||||
| #[derive(Clone, Debug, serde::Deserialize)] | #[derive(Clone, Debug, serde::Deserialize)] | ||||||
| @ -42,7 +43,6 @@ pub struct ClickhouseConfig { | |||||||
|     username: String, |     username: String, | ||||||
|     password: Option<String>, |     password: Option<String>, | ||||||
|     host: String, |     host: String, | ||||||
|     database_name: String, |  | ||||||
| } | } | ||||||
|  |  | ||||||
| impl Default for ClickhouseConfig { | impl Default for ClickhouseConfig { | ||||||
| @ -51,7 +51,6 @@ impl Default for ClickhouseConfig { | |||||||
|             username: "default".to_string(), |             username: "default".to_string(), | ||||||
|             password: None, |             password: None, | ||||||
|             host: "http://localhost:8123".to_string(), |             host: "http://localhost:8123".to_string(), | ||||||
|             database_name: "default".to_string(), |  | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
| } | } | ||||||
| @ -63,7 +62,7 @@ impl ClickhouseClient { | |||||||
|         let params = CkhQuery { |         let params = CkhQuery { | ||||||
|             date_time_output_format: String::from("iso"), |             date_time_output_format: String::from("iso"), | ||||||
|             output_format_json_quote_64bit_integers: 0, |             output_format_json_quote_64bit_integers: 0, | ||||||
|             database: self.config.database_name.clone(), |             database: self.database.clone(), | ||||||
|         }; |         }; | ||||||
|         let response = client |         let response = client | ||||||
|             .post(&self.config.host) |             .post(&self.config.host) | ||||||
|  | |||||||
| @ -601,22 +601,30 @@ impl AnalyticsProvider { | |||||||
|         } |         } | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     pub async fn from_conf(config: &AnalyticsConfig, tenant: &str) -> Self { |     pub async fn from_conf( | ||||||
|  |         config: &AnalyticsConfig, | ||||||
|  |         tenant: &dyn storage_impl::config::ClickHouseConfig, | ||||||
|  |     ) -> Self { | ||||||
|         match config { |         match config { | ||||||
|             AnalyticsConfig::Sqlx { sqlx } => Self::Sqlx(SqlxClient::from_conf(sqlx, tenant).await), |             AnalyticsConfig::Sqlx { sqlx } => { | ||||||
|  |                 Self::Sqlx(SqlxClient::from_conf(sqlx, tenant.get_schema()).await) | ||||||
|  |             } | ||||||
|             AnalyticsConfig::Clickhouse { clickhouse } => Self::Clickhouse(ClickhouseClient { |             AnalyticsConfig::Clickhouse { clickhouse } => Self::Clickhouse(ClickhouseClient { | ||||||
|                 config: Arc::new(clickhouse.clone()), |                 config: Arc::new(clickhouse.clone()), | ||||||
|  |                 database: tenant.get_clickhouse_database().to_string(), | ||||||
|             }), |             }), | ||||||
|             AnalyticsConfig::CombinedCkh { sqlx, clickhouse } => Self::CombinedCkh( |             AnalyticsConfig::CombinedCkh { sqlx, clickhouse } => Self::CombinedCkh( | ||||||
|                 SqlxClient::from_conf(sqlx, tenant).await, |                 SqlxClient::from_conf(sqlx, tenant.get_schema()).await, | ||||||
|                 ClickhouseClient { |                 ClickhouseClient { | ||||||
|                     config: Arc::new(clickhouse.clone()), |                     config: Arc::new(clickhouse.clone()), | ||||||
|  |                     database: tenant.get_clickhouse_database().to_string(), | ||||||
|                 }, |                 }, | ||||||
|             ), |             ), | ||||||
|             AnalyticsConfig::CombinedSqlx { sqlx, clickhouse } => Self::CombinedSqlx( |             AnalyticsConfig::CombinedSqlx { sqlx, clickhouse } => Self::CombinedSqlx( | ||||||
|                 SqlxClient::from_conf(sqlx, tenant).await, |                 SqlxClient::from_conf(sqlx, tenant.get_schema()).await, | ||||||
|                 ClickhouseClient { |                 ClickhouseClient { | ||||||
|                     config: Arc::new(clickhouse.clone()), |                     config: Arc::new(clickhouse.clone()), | ||||||
|  |                     database: tenant.get_clickhouse_database().to_string(), | ||||||
|                 }, |                 }, | ||||||
|             ), |             ), | ||||||
|         } |         } | ||||||
|  | |||||||
| @ -87,8 +87,8 @@ pub const MAX_TTL_FOR_EXTENDED_CARD_INFO: u16 = 60 * 60 * 2; | |||||||
| /// Default tenant to be used when multitenancy is disabled | /// Default tenant to be used when multitenancy is disabled | ||||||
| pub const DEFAULT_TENANT: &str = "public"; | pub const DEFAULT_TENANT: &str = "public"; | ||||||
|  |  | ||||||
| /// Global tenant to be used when multitenancy is enabled | /// Default tenant to be used when multitenancy is disabled | ||||||
| pub const GLOBAL_TENANT: &str = "global"; | pub const TENANT_HEADER: &str = "x-tenant-id"; | ||||||
|  |  | ||||||
| /// Max Length for MerchantReferenceId | /// Max Length for MerchantReferenceId | ||||||
| pub const MAX_ALLOWED_MERCHANT_REFERENCE_ID_LENGTH: u8 = 64; | pub const MAX_ALLOWED_MERCHANT_REFERENCE_ID_LENGTH: u8 = 64; | ||||||
|  | |||||||
| @ -1,23 +1,9 @@ | |||||||
| use async_bb8_diesel::AsyncRunQueryDsl; |  | ||||||
| use common_utils::pii; | use common_utils::pii; | ||||||
| use diesel::{ | use diesel::{associations::HasTable, ExpressionMethods}; | ||||||
|     associations::HasTable, debug_query, result::Error as DieselError, ExpressionMethods, |  | ||||||
|     JoinOnDsl, QueryDsl, |  | ||||||
| }; |  | ||||||
| use error_stack::report; |  | ||||||
| use router_env::logger; |  | ||||||
| pub mod sample_data; | pub mod sample_data; | ||||||
|  |  | ||||||
| use crate::{ | use crate::{ | ||||||
|     errors::{self}, |     query::generics, schema::users::dsl as users_dsl, user::*, PgPooledConn, StorageResult, | ||||||
|     query::generics, |  | ||||||
|     schema::{ |  | ||||||
|         user_roles::{self, dsl as user_roles_dsl}, |  | ||||||
|         users::dsl as users_dsl, |  | ||||||
|     }, |  | ||||||
|     user::*, |  | ||||||
|     user_role::UserRole, |  | ||||||
|     PgPooledConn, StorageResult, |  | ||||||
| }; | }; | ||||||
|  |  | ||||||
| impl UserNew { | impl UserNew { | ||||||
| @ -90,27 +76,6 @@ impl User { | |||||||
|         .await |         .await | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     pub async fn find_joined_users_and_roles_by_merchant_id( |  | ||||||
|         conn: &PgPooledConn, |  | ||||||
|         mid: &str, |  | ||||||
|     ) -> StorageResult<Vec<(Self, UserRole)>> { |  | ||||||
|         let query = Self::table() |  | ||||||
|             .inner_join(user_roles::table.on(user_roles_dsl::user_id.eq(users_dsl::user_id))) |  | ||||||
|             .filter(user_roles_dsl::merchant_id.eq(mid.to_owned())); |  | ||||||
|  |  | ||||||
|         logger::debug!(query = %debug_query::<diesel::pg::Pg,_>(&query).to_string()); |  | ||||||
|  |  | ||||||
|         query |  | ||||||
|             .get_results_async::<(Self, UserRole)>(conn) |  | ||||||
|             .await |  | ||||||
|             .map_err(|err| match err { |  | ||||||
|                 DieselError::NotFound => { |  | ||||||
|                     report!(err).change_context(errors::DatabaseError::NotFound) |  | ||||||
|                 } |  | ||||||
|                 _ => report!(err).change_context(errors::DatabaseError::Others), |  | ||||||
|             }) |  | ||||||
|     } |  | ||||||
|  |  | ||||||
|     pub async fn find_users_by_user_ids( |     pub async fn find_users_by_user_ids( | ||||||
|         conn: &PgPooledConn, |         conn: &PgPooledConn, | ||||||
|         user_ids: Vec<String>, |         user_ids: Vec<String>, | ||||||
|  | |||||||
| @ -143,6 +143,7 @@ pub struct Tenant { | |||||||
|     pub base_url: String, |     pub base_url: String, | ||||||
|     pub schema: String, |     pub schema: String, | ||||||
|     pub redis_key_prefix: String, |     pub redis_key_prefix: String, | ||||||
|  |     pub clickhouse_database: String, | ||||||
| } | } | ||||||
|  |  | ||||||
| #[derive(Debug, Deserialize, Clone)] | #[derive(Debug, Deserialize, Clone)] | ||||||
|  | |||||||
| @ -129,6 +129,7 @@ pub struct Settings<S: SecretState> { | |||||||
| pub struct Multitenancy { | pub struct Multitenancy { | ||||||
|     pub tenants: TenantConfig, |     pub tenants: TenantConfig, | ||||||
|     pub enabled: bool, |     pub enabled: bool, | ||||||
|  |     pub global_tenant: GlobalTenant, | ||||||
| } | } | ||||||
|  |  | ||||||
| impl Multitenancy { | impl Multitenancy { | ||||||
| @ -153,6 +154,7 @@ pub struct Tenant { | |||||||
|     pub base_url: String, |     pub base_url: String, | ||||||
|     pub schema: String, |     pub schema: String, | ||||||
|     pub redis_key_prefix: String, |     pub redis_key_prefix: String, | ||||||
|  |     pub clickhouse_database: String, | ||||||
| } | } | ||||||
|  |  | ||||||
| impl storage_impl::config::TenantConfig for Tenant { | impl storage_impl::config::TenantConfig for Tenant { | ||||||
| @ -164,6 +166,12 @@ impl storage_impl::config::TenantConfig for Tenant { | |||||||
|     } |     } | ||||||
| } | } | ||||||
|  |  | ||||||
|  | impl storage_impl::config::ClickHouseConfig for Tenant { | ||||||
|  |     fn get_clickhouse_database(&self) -> &str { | ||||||
|  |         self.clickhouse_database.as_str() | ||||||
|  |     } | ||||||
|  | } | ||||||
|  |  | ||||||
| #[derive(Debug, Deserialize, Clone, Default)] | #[derive(Debug, Deserialize, Clone, Default)] | ||||||
| pub struct GlobalTenant { | pub struct GlobalTenant { | ||||||
|     pub schema: String, |     pub schema: String, | ||||||
|  | |||||||
| @ -1,3 +1,4 @@ | |||||||
|  | use common_utils::consts::TENANT_HEADER; | ||||||
| use futures::StreamExt; | use futures::StreamExt; | ||||||
| use router_env::{ | use router_env::{ | ||||||
|     logger, |     logger, | ||||||
| @ -140,10 +141,17 @@ where | |||||||
|     // TODO: have a common source of truth for the list of top level fields |     // TODO: have a common source of truth for the list of top level fields | ||||||
|     // /crates/router_env/src/logger/storage.rs also has a list of fields  called PERSISTENT_KEYS |     // /crates/router_env/src/logger/storage.rs also has a list of fields  called PERSISTENT_KEYS | ||||||
|     fn call(&self, req: actix_web::dev::ServiceRequest) -> Self::Future { |     fn call(&self, req: actix_web::dev::ServiceRequest) -> Self::Future { | ||||||
|  |         let tenant_id = req | ||||||
|  |             .headers() | ||||||
|  |             .get(TENANT_HEADER) | ||||||
|  |             .and_then(|i| i.to_str().ok()) | ||||||
|  |             .map(|s| s.to_owned()); | ||||||
|         let response_fut = self.service.call(req); |         let response_fut = self.service.call(req); | ||||||
|  |  | ||||||
|         Box::pin( |         Box::pin( | ||||||
|             async move { |             async move { | ||||||
|  |                 if let Some(tenant_id) = tenant_id { | ||||||
|  |                     router_env::tracing::Span::current().record("tenant_id", &tenant_id); | ||||||
|  |                 } | ||||||
|                 let response = response_fut.await; |                 let response = response_fut.await; | ||||||
|                 router_env::tracing::Span::current().record("golden_log_line", true); |                 router_env::tracing::Span::current().record("golden_log_line", true); | ||||||
|                 response |                 response | ||||||
|  | |||||||
| @ -5,7 +5,6 @@ use actix_web::{web, Scope}; | |||||||
| use api_models::routing::RoutingRetrieveQuery; | use api_models::routing::RoutingRetrieveQuery; | ||||||
| #[cfg(feature = "olap")] | #[cfg(feature = "olap")] | ||||||
| use common_enums::TransactionType; | use common_enums::TransactionType; | ||||||
| use common_utils::consts::{DEFAULT_TENANT, GLOBAL_TENANT}; |  | ||||||
| #[cfg(feature = "email")] | #[cfg(feature = "email")] | ||||||
| use external_services::email::{ses::AwsSes, EmailService}; | use external_services::email::{ses::AwsSes, EmailService}; | ||||||
| use external_services::file_storage::FileStorageInterface; | use external_services::file_storage::FileStorageInterface; | ||||||
| @ -257,19 +256,11 @@ impl AppState { | |||||||
|             let cache_store = get_cache_store(&conf.clone(), shut_down_signal, testable) |             let cache_store = get_cache_store(&conf.clone(), shut_down_signal, testable) | ||||||
|                 .await |                 .await | ||||||
|                 .expect("Failed to create store"); |                 .expect("Failed to create store"); | ||||||
|             let global_tenant = if conf.multitenancy.enabled { |  | ||||||
|                 GLOBAL_TENANT |  | ||||||
|             } else { |  | ||||||
|                 DEFAULT_TENANT |  | ||||||
|             }; |  | ||||||
|             let global_store: Box<dyn GlobalStorageInterface> = Self::get_store_interface( |             let global_store: Box<dyn GlobalStorageInterface> = Self::get_store_interface( | ||||||
|                 &storage_impl, |                 &storage_impl, | ||||||
|                 &event_handler, |                 &event_handler, | ||||||
|                 &conf, |                 &conf, | ||||||
|                 &settings::GlobalTenant { |                 &conf.multitenancy.global_tenant, | ||||||
|                     schema: global_tenant.to_string(), |  | ||||||
|                     redis_key_prefix: String::default(), |  | ||||||
|                 }, |  | ||||||
|                 Arc::clone(&cache_store), |                 Arc::clone(&cache_store), | ||||||
|                 testable, |                 testable, | ||||||
|             ) |             ) | ||||||
| @ -288,9 +279,7 @@ impl AppState { | |||||||
|                 .get_storage_interface(); |                 .get_storage_interface(); | ||||||
|                 stores.insert(tenant_name.clone(), store); |                 stores.insert(tenant_name.clone(), store); | ||||||
|                 #[cfg(feature = "olap")] |                 #[cfg(feature = "olap")] | ||||||
|                 let pool = |                 let pool = AnalyticsProvider::from_conf(conf.analytics.get_inner(), tenant).await; | ||||||
|                     AnalyticsProvider::from_conf(conf.analytics.get_inner(), tenant_name.as_str()) |  | ||||||
|                         .await; |  | ||||||
|                 #[cfg(feature = "olap")] |                 #[cfg(feature = "olap")] | ||||||
|                 pools.insert(tenant_name.clone(), pool); |                 pools.insert(tenant_name.clone(), pool); | ||||||
|             } |             } | ||||||
|  | |||||||
| @ -19,7 +19,7 @@ use api_models::enums::{CaptureMethod, PaymentMethodType}; | |||||||
| pub use client::{proxy_bypass_urls, ApiClient, MockApiClient, ProxyClient}; | pub use client::{proxy_bypass_urls, ApiClient, MockApiClient, ProxyClient}; | ||||||
| pub use common_utils::request::{ContentType, Method, Request, RequestBuilder}; | pub use common_utils::request::{ContentType, Method, Request, RequestBuilder}; | ||||||
| use common_utils::{ | use common_utils::{ | ||||||
|     consts::X_HS_LATENCY, |     consts::{DEFAULT_TENANT, TENANT_HEADER, X_HS_LATENCY}, | ||||||
|     errors::{ErrorSwitch, ReportSwitchExt}, |     errors::{ErrorSwitch, ReportSwitchExt}, | ||||||
|     request::RequestContent, |     request::RequestContent, | ||||||
| }; | }; | ||||||
| @ -783,10 +783,10 @@ where | |||||||
|         .into_iter() |         .into_iter() | ||||||
|         .collect(); |         .collect(); | ||||||
|     let tenant_id = if !state.conf.multitenancy.enabled { |     let tenant_id = if !state.conf.multitenancy.enabled { | ||||||
|         common_utils::consts::DEFAULT_TENANT.to_string() |         DEFAULT_TENANT.to_string() | ||||||
|     } else { |     } else { | ||||||
|         incoming_request_header |         incoming_request_header | ||||||
|             .get("x-tenant-id") |             .get(TENANT_HEADER) | ||||||
|             .and_then(|value| value.to_str().ok()) |             .and_then(|value| value.to_str().ok()) | ||||||
|             .ok_or_else(|| errors::ApiErrorResponse::MissingTenantId.switch()) |             .ok_or_else(|| errors::ApiErrorResponse::MissingTenantId.switch()) | ||||||
|             .map(|req_tenant_id| { |             .map(|req_tenant_id| { | ||||||
|  | |||||||
| @ -38,6 +38,10 @@ pub trait TenantConfig: Send + Sync { | |||||||
|     fn get_redis_key_prefix(&self) -> &str; |     fn get_redis_key_prefix(&self) -> &str; | ||||||
| } | } | ||||||
|  |  | ||||||
|  | pub trait ClickHouseConfig: TenantConfig + Send + Sync { | ||||||
|  |     fn get_clickhouse_database(&self) -> &str; | ||||||
|  | } | ||||||
|  |  | ||||||
| #[derive(Debug, serde::Deserialize, Clone, Copy, Default)] | #[derive(Debug, serde::Deserialize, Clone, Copy, Default)] | ||||||
| #[serde(rename_all = "PascalCase")] | #[serde(rename_all = "PascalCase")] | ||||||
| pub enum QueueStrategy { | pub enum QueueStrategy { | ||||||
|  | |||||||
| @ -330,6 +330,7 @@ keys = "user-agent" | |||||||
|  |  | ||||||
| [multitenancy] | [multitenancy] | ||||||
| enabled = false | enabled = false | ||||||
|  | global_tenant = { schema = "public", redis_key_prefix = "" } | ||||||
|  |  | ||||||
| [multitenancy.tenants] | [multitenancy.tenants] | ||||||
| public = { name = "hyperswitch", base_url = "http://localhost:8080", schema = "public", redis_key_prefix = ""} | public = { name = "hyperswitch", base_url = "http://localhost:8080", schema = "public", redis_key_prefix = "", clickhouse_database = "default"} | ||||||
		Reference in New Issue
	
	Block a user
	 Jagan
					Jagan