feat(multitenancy): move users and tenants to global schema (#4781)

Co-authored-by: hyperswitch-bot[bot] <148525504+hyperswitch-bot[bot]@users.noreply.github.com>
Co-authored-by: Arun Raj M <jarnura47@gmail.com>
This commit is contained in:
Jagan
2024-06-05 18:50:40 +05:30
committed by GitHub
parent 76ec5e1e02
commit c5e28f2670
16 changed files with 278 additions and 86 deletions

View File

@ -87,6 +87,9 @@ pub const MAX_TTL_FOR_EXTENDED_CARD_INFO: u16 = 60 * 60 * 2;
/// Default tenant to be used when multitenancy is disabled
pub const DEFAULT_TENANT: &str = "public";
/// Global tenant to be used when multitenancy is enabled
pub const GLOBAL_TENANT: &str = "global";
/// Max Length for MerchantReferenceId
pub const MAX_ALLOWED_MERCHANT_REFERENCE_ID_LENGTH: u8 = 64;

View File

@ -110,4 +110,17 @@ impl User {
_ => report!(err).change_context(errors::DatabaseError::Others),
})
}
pub async fn find_users_by_user_ids(
conn: &PgPooledConn,
user_ids: Vec<String>,
) -> StorageResult<Vec<Self>> {
generics::generic_filter::<
<Self as HasTable>::Table,
_,
<<Self as HasTable>::Table as diesel::Table>::PrimaryKey,
_,
>(conn, users_dsl::user_id.eq_any(user_ids), None, None, None)
.await
}
}

View File

@ -90,4 +90,18 @@ impl UserRole {
)
.await
}
pub async fn list_by_merchant_id(
conn: &PgPooledConn,
merchant_id: String,
) -> StorageResult<Vec<Self>> {
generics::generic_filter::<<Self as HasTable>::Table, _, _, _>(
conn,
dsl::merchant_id.eq(merchant_id),
None,
None,
Some(dsl::created_at.asc()),
)
.await
}
}

View File

@ -444,7 +444,7 @@ pub mod routes {
&req,
json_payload.into_inner(),
|state, (auth, user_id): auth::AuthenticationDataWithUserId, payload, _| async move {
let user = UserInterface::find_user_by_id(&*state.store, &user_id)
let user = UserInterface::find_user_by_id(&*state.global_store, &user_id)
.await
.change_context(AnalyticsError::UnknownError)?;
@ -486,7 +486,7 @@ pub mod routes {
&req,
json_payload.into_inner(),
|state, (auth, user_id): auth::AuthenticationDataWithUserId, payload, _| async move {
let user = UserInterface::find_user_by_id(&*state.store, &user_id)
let user = UserInterface::find_user_by_id(&*state.global_store, &user_id)
.await
.change_context(AnalyticsError::UnknownError)?;
@ -528,7 +528,7 @@ pub mod routes {
&req,
json_payload.into_inner(),
|state, (auth, user_id): auth::AuthenticationDataWithUserId, payload, _| async move {
let user = UserInterface::find_user_by_id(&*state.store, &user_id)
let user = UserInterface::find_user_by_id(&*state.global_store, &user_id)
.await
.change_context(AnalyticsError::UnknownError)?;

View File

@ -163,6 +163,21 @@ impl storage_impl::config::TenantConfig for Tenant {
}
}
#[derive(Debug, Deserialize, Clone, Default)]
pub struct GlobalTenant {
pub schema: String,
pub redis_key_prefix: String,
}
impl storage_impl::config::TenantConfig for GlobalTenant {
fn get_schema(&self) -> &str {
self.schema.as_str()
}
fn get_redis_key_prefix(&self) -> &str {
self.redis_key_prefix.as_str()
}
}
#[derive(Debug, Deserialize, Clone, Default)]
pub struct UnmaskedHeaders {
#[serde(deserialize_with = "deserialize_hashset")]

View File

@ -1,3 +1,5 @@
use std::collections::HashMap;
use api_models::user::{self as user_api, InviteMultipleUserResponse};
#[cfg(feature = "email")]
use diesel_models::user_role::UserRoleUpdate;
@ -169,7 +171,7 @@ pub async fn signin(
request: user_api::SignInRequest,
) -> UserResponse<user_api::TokenOrPayloadResponse<user_api::SignInResponse>> {
let user_from_db: domain::UserFromStorage = state
.store
.global_store
.find_user_by_email(&request.email)
.await
.map_err(|e| {
@ -213,7 +215,7 @@ pub async fn signin_token_only_flow(
request: user_api::SignInRequest,
) -> UserResponse<user_api::TokenOrPayloadResponse<user_api::SignInResponse>> {
let user_from_db: domain::UserFromStorage = state
.store
.global_store
.find_user_by_email(&request.email)
.await
.to_not_found_response(UserErrors::InvalidCredentials)?
@ -238,7 +240,7 @@ pub async fn connect_account(
state: SessionState,
request: user_api::ConnectAccountRequest,
) -> UserResponse<user_api::ConnectAccountResponse> {
let find_user = state.store.find_user_by_email(&request.email).await;
let find_user = state.global_store.find_user_by_email(&request.email).await;
if let Ok(found_user) = find_user {
let user_from_db: domain::UserFromStorage = found_user.into();
@ -344,7 +346,7 @@ pub async fn change_password(
user_from_token: auth::UserFromToken,
) -> UserResponse<()> {
let user: domain::UserFromStorage = state
.store
.global_store
.find_user_by_id(&user_from_token.user_id)
.await
.change_context(UserErrors::InternalServerError)?
@ -362,7 +364,7 @@ pub async fn change_password(
utils::user::password::generate_password_hash(new_password.get_secret())?;
let _ = state
.store
.global_store
.update_user_by_user_id(
user.get_user_id(),
diesel_models::user::UserUpdate::PasswordUpdate {
@ -401,7 +403,7 @@ pub async fn forgot_password(
let user_email = domain::UserEmail::from_pii_email(request.email)?;
let user_from_db = state
.store
.global_store
.find_user_by_email(&user_email.into_inner())
.await
.map_err(|e| {
@ -439,7 +441,7 @@ pub async fn rotate_password(
_req_state: ReqState,
) -> UserResponse<()> {
let user: domain::UserFromStorage = state
.store
.global_store
.find_user_by_id(&user_token.user_id)
.await
.change_context(UserErrors::InternalServerError)?
@ -453,7 +455,7 @@ pub async fn rotate_password(
}
let user = state
.store
.global_store
.update_user_by_user_id(
&user_token.user_id,
storage_user::UserUpdate::PasswordUpdate {
@ -484,7 +486,7 @@ pub async fn reset_password_token_only_flow(
auth::blacklist::check_email_token_in_blacklist(&state, &token).await?;
let user_from_db: domain::UserFromStorage = state
.store
.global_store
.find_user_by_email(
&email_token
.get_email()
@ -502,7 +504,7 @@ pub async fn reset_password_token_only_flow(
let hash_password = utils::user::password::generate_password_hash(password.get_secret())?;
let user = state
.store
.global_store
.update_user_by_email(
&email_token
.get_email()
@ -540,7 +542,7 @@ pub async fn reset_password(
let hash_password = utils::user::password::generate_password_hash(password.get_secret())?;
let user = state
.store
.global_store
.update_user_by_email(
&email_token
.get_email()
@ -638,7 +640,7 @@ async fn handle_invitation(
let invitee_email = domain::UserEmail::from_pii_email(request.email.clone())?;
let invitee_user = state
.store
.global_store
.find_user_by_email(&invitee_email.into_inner())
.await;
@ -745,7 +747,7 @@ async fn handle_new_user_invitation(
let new_user = domain::NewUser::try_from((request.clone(), user_from_token.clone()))?;
new_user
.insert_user_in_db(state.store.as_ref())
.insert_user_in_db(state.global_store.as_ref())
.await
.change_context(UserErrors::InternalServerError)?;
@ -855,7 +857,7 @@ pub async fn resend_invite(
) -> UserResponse<()> {
let invitee_email = domain::UserEmail::from_pii_email(request.email)?;
let user: domain::UserFromStorage = state
.store
.global_store
.find_user_by_email(&invitee_email.clone().into_inner())
.await
.map_err(|e| {
@ -922,7 +924,7 @@ pub async fn accept_invite_from_email(
auth::blacklist::check_email_token_in_blacklist(&state, &token).await?;
let user: domain::UserFromStorage = state
.store
.global_store
.find_user_by_email(
&email_token
.get_email()
@ -954,7 +956,7 @@ pub async fn accept_invite_from_email(
.map_err(|e| logger::error!(?e));
let user_from_db: domain::UserFromStorage = state
.store
.global_store
.update_user_by_user_id(user.get_user_id(), storage_user::UserUpdate::VerifyUser)
.await
.change_context(UserErrors::InternalServerError)?
@ -990,7 +992,7 @@ pub async fn accept_invite_from_email_token_only_flow(
auth::blacklist::check_email_token_in_blacklist(&state, &token).await?;
let user_from_db: domain::UserFromStorage = state
.store
.global_store
.find_user_by_email(
&email_token
.get_email()
@ -1082,7 +1084,7 @@ pub async fn create_internal_user(
store_user.set_is_verified(true);
state
.store
.global_store
.insert_user(store_user)
.await
.map_err(|e| {
@ -1316,19 +1318,40 @@ pub async fn list_users_for_merchant_account(
state: SessionState,
user_from_token: auth::UserFromToken,
) -> UserResponse<user_api::ListUsersResponse> {
let users_and_user_roles = state
let user_roles: HashMap<String, _> = state
.store
.find_users_and_roles_by_merchant_id(user_from_token.merchant_id.as_str())
.list_user_roles_by_merchant_id(user_from_token.merchant_id.as_str())
.await
.change_context(UserErrors::InternalServerError)
.attach_printable("No user roles for given merchant id")?
.into_iter()
.map(|role| (role.user_id.clone(), role))
.collect();
let user_ids = user_roles.keys().cloned().collect::<Vec<_>>();
let users = state
.global_store
.find_users_by_user_ids(user_ids)
.await
.change_context(UserErrors::InternalServerError)
.attach_printable("No users for given merchant id")?;
let users_and_user_roles: Vec<_> = users
.into_iter()
.filter_map(|user| {
user_roles
.get(&user.user_id)
.map(|role| (user.clone(), role.clone()))
})
.collect();
let users_user_roles_and_roles =
futures::future::try_join_all(users_and_user_roles.into_iter().map(
|(user, user_role)| async {
roles::RoleInfo::from_role_id(
&state,
&user_role.role_id,
&user_role.role_id.clone(),
&user_role.merchant_id,
&user_role.org_id,
)
@ -1346,7 +1369,7 @@ pub async fn list_users_for_merchant_account(
user_api::UserDetails {
email: user.get_email(),
name: user.get_name(),
role_id: user_role.role_id,
role_id: user_role.role_id.clone(),
role_name: role_info.get_role_name().to_string(),
status: user_role.status.foreign_into(),
last_modified_at: user_role.last_modified,
@ -1372,7 +1395,7 @@ pub async fn verify_email(
auth::blacklist::check_email_token_in_blacklist(&state, &token).await?;
let user = state
.store
.global_store
.find_user_by_email(
&email_token
.get_email()
@ -1382,7 +1405,7 @@ pub async fn verify_email(
.change_context(UserErrors::InternalServerError)?;
let user = state
.store
.global_store
.update_user_by_user_id(user.user_id.as_str(), storage_user::UserUpdate::VerifyUser)
.await
.change_context(UserErrors::InternalServerError)?;
@ -1432,7 +1455,7 @@ pub async fn verify_email_token_only_flow(
auth::blacklist::check_email_token_in_blacklist(&state, &token).await?;
let user_from_email = state
.store
.global_store
.find_user_by_email(
&email_token
.get_email()
@ -1446,7 +1469,7 @@ pub async fn verify_email_token_only_flow(
}
let user_from_db: domain::UserFromStorage = state
.store
.global_store
.update_user_by_user_id(
user_from_email.user_id.as_str(),
storage_user::UserUpdate::VerifyUser,
@ -1483,7 +1506,7 @@ pub async fn send_verification_mail(
) -> UserResponse<()> {
let user_email = domain::UserEmail::try_from(req.email)?;
let user = state
.store
.global_store
.find_user_by_email(&user_email.into_inner())
.await
.map_err(|e| {
@ -1522,7 +1545,7 @@ pub async fn verify_token(
req: auth::ReconUser,
) -> UserResponse<user_api::VerifyTokenResponse> {
let user = state
.store
.global_store
.find_user_by_id(&req.user_id)
.await
.map_err(|e| {
@ -1552,7 +1575,7 @@ pub async fn update_user_details(
_req_state: ReqState,
) -> UserResponse<()> {
let user: domain::UserFromStorage = state
.store
.global_store
.find_user_by_id(&user_token.user_id)
.await
.change_context(UserErrors::InternalServerError)?
@ -1581,7 +1604,7 @@ pub async fn update_user_details(
};
state
.store
.global_store
.update_user_by_user_id(user.get_user_id(), user_update)
.await
.change_context(UserErrors::InternalServerError)?;
@ -1602,7 +1625,7 @@ pub async fn user_from_email(
auth::blacklist::check_email_token_in_blacklist(&state, &token).await?;
let user_from_db: domain::UserFromStorage = state
.store
.global_store
.find_user_by_email(
&email_token
.get_email()
@ -1629,7 +1652,7 @@ pub async fn begin_totp(
user_token: auth::UserFromSinglePurposeToken,
) -> UserResponse<user_api::BeginTotpResponse> {
let user_from_db: domain::UserFromStorage = state
.store
.global_store
.find_user_by_id(&user_token.user_id)
.await
.change_context(UserErrors::InternalServerError)?
@ -1662,7 +1685,7 @@ pub async fn reset_totp(
user_token: auth::UserFromToken,
) -> UserResponse<user_api::BeginTotpResponse> {
let user_from_db: domain::UserFromStorage = state
.store
.global_store
.find_user_by_id(&user_token.user_id)
.await
.change_context(UserErrors::InternalServerError)?
@ -1701,7 +1724,7 @@ pub async fn verify_totp(
req: user_api::VerifyTotpRequest,
) -> UserResponse<user_api::TokenResponse> {
let user_from_db: domain::UserFromStorage = state
.store
.global_store
.find_user_by_id(&user_token.user_id)
.await
.change_context(UserErrors::InternalServerError)?
@ -1741,7 +1764,7 @@ pub async fn update_totp(
req: user_api::VerifyTotpRequest,
) -> UserResponse<()> {
let user_from_db: domain::UserFromStorage = state
.store
.global_store
.find_user_by_id(&user_token.user_id)
.await
.change_context(UserErrors::InternalServerError)?
@ -1768,7 +1791,7 @@ pub async fn update_totp(
let key_store = user_from_db.get_or_create_key_store(&state).await?;
state
.store
.global_store
.update_user_by_user_id(
&user_token.user_id,
storage_user::UserUpdate::TotpUpdate {
@ -1815,7 +1838,7 @@ pub async fn generate_recovery_codes(
let recovery_codes = domain::RecoveryCodes::generate_new();
state
.store
.global_store
.update_user_by_user_id(
&user_token.user_id,
storage_user::UserUpdate::TotpUpdate {
@ -1842,7 +1865,7 @@ pub async fn verify_recovery_code(
req: user_api::VerifyRecoveryCodeRequest,
) -> UserResponse<user_api::TokenResponse> {
let user_from_db: domain::UserFromStorage = state
.store
.global_store
.find_user_by_id(&user_token.user_id)
.await
.change_context(UserErrors::InternalServerError)?
@ -1866,7 +1889,7 @@ pub async fn verify_recovery_code(
let _ = recovery_codes.remove(matching_index);
state
.store
.global_store
.update_user_by_user_id(
user_from_db.get_user_id(),
storage_user::UserUpdate::TotpUpdate {
@ -1887,7 +1910,7 @@ pub async fn terminate_two_factor_auth(
skip_two_factor_auth: bool,
) -> UserResponse<user_api::TokenResponse> {
let user_from_db: domain::UserFromStorage = state
.store
.global_store
.find_user_by_id(&user_token.user_id)
.await
.change_context(UserErrors::InternalServerError)?
@ -1906,7 +1929,7 @@ pub async fn terminate_two_factor_auth(
if user_from_db.get_totp_status() != TotpStatus::Set {
state
.store
.global_store
.update_user_by_user_id(
user_from_db.get_user_id(),
storage_user::UserUpdate::TotpUpdate {

View File

@ -228,7 +228,7 @@ pub async fn merchant_select(
if let Some(true) = req.need_dashboard_entry_response {
let user_from_db = state
.store
.global_store
.find_user_by_id(user_token.user_id.as_str())
.await
.change_context(UserErrors::InternalServerError)?
@ -281,7 +281,7 @@ pub async fn merchant_select_token_only_flow(
.ok_or(UserErrors::MerchantIdNotFound)?;
let user_from_db: domain::UserFromStorage = state
.store
.global_store
.find_user_by_id(user_token.user_id.as_str())
.await
.change_context(UserErrors::InternalServerError)?
@ -309,7 +309,7 @@ pub async fn delete_user_role(
_req_state: ReqState,
) -> UserResponse<()> {
let user_from_db: domain::UserFromStorage = state
.store
.global_store
.find_user_by_email(&domain::UserEmail::from_pii_email(request.email)?.into_inner())
.await
.map_err(|e| {
@ -369,7 +369,7 @@ pub async fn delete_user_role(
.attach_printable("Error while deleting user role")?
} else {
state
.store
.global_store
.delete_user_by_user_id(user_from_db.get_user_id())
.await
.change_context(UserErrors::InternalServerError)

View File

@ -112,13 +112,11 @@ pub trait StorageInterface:
+ OrganizationInterface
+ routing_algorithm::RoutingAlgorithmInterface
+ gsm::GsmInterface
+ user::UserInterface
+ user_role::UserRoleInterface
+ authorization::AuthorizationInterface
+ user::sample_data::BatchSampleDataInterface
+ health_check::HealthCheckDbInterface
+ role::RoleInterface
+ user_key_store::UserKeyStoreInterface
+ authentication::AuthenticationInterface
+ 'static
{
@ -127,6 +125,22 @@ pub trait StorageInterface:
fn get_cache_store(&self) -> Box<(dyn RedisConnInterface + Send + Sync + 'static)>;
}
#[async_trait::async_trait]
pub trait GlobalStorageInterface:
Send
+ Sync
+ dyn_clone::DynClone
+ user::UserInterface
+ user_key_store::UserKeyStoreInterface
+ 'static
{
}
pub trait CommonStorageInterface: StorageInterface + GlobalStorageInterface {
fn get_storage_interface(&self) -> Box<dyn StorageInterface>;
fn get_global_storage_interface(&self) -> Box<dyn GlobalStorageInterface>;
}
pub trait MasterKeyInterface {
fn get_master_key(&self) -> &[u8];
}
@ -158,6 +172,9 @@ impl StorageInterface for Store {
}
}
#[async_trait::async_trait]
impl GlobalStorageInterface for Store {}
#[async_trait::async_trait]
impl StorageInterface for MockDb {
fn get_scheduler_db(&self) -> Box<dyn scheduler::SchedulerInterface> {
@ -169,6 +186,27 @@ impl StorageInterface for MockDb {
}
}
#[async_trait::async_trait]
impl GlobalStorageInterface for MockDb {}
impl CommonStorageInterface for MockDb {
fn get_global_storage_interface(&self) -> Box<dyn GlobalStorageInterface> {
Box::new(self.clone())
}
fn get_storage_interface(&self) -> Box<dyn StorageInterface> {
Box::new(self.clone())
}
}
impl CommonStorageInterface for Store {
fn get_global_storage_interface(&self) -> Box<dyn GlobalStorageInterface> {
Box::new(self.clone())
}
fn get_storage_interface(&self) -> Box<dyn StorageInterface> {
Box::new(self.clone())
}
}
pub trait RequestIdStore {
fn add_request_id(&mut self, _request_id: String) {}
fn get_request_id(&self) -> Option<String> {
@ -205,6 +243,7 @@ where
}
dyn_clone::clone_trait_object!(StorageInterface);
dyn_clone::clone_trait_object!(GlobalStorageInterface);
impl RequestIdStore for KafkaStore {
fn add_request_id(&mut self, request_id: String) {

View File

@ -66,7 +66,7 @@ use crate::{
refund::RefundInterface,
reverse_lookup::ReverseLookupInterface,
routing_algorithm::RoutingAlgorithmInterface,
MasterKeyInterface, StorageInterface,
CommonStorageInterface, GlobalStorageInterface, MasterKeyInterface, StorageInterface,
},
services::{authentication, kafka::KafkaProducer, Store},
types::{
@ -2307,6 +2307,17 @@ impl StorageInterface for KafkaStore {
}
}
impl GlobalStorageInterface for KafkaStore {}
impl CommonStorageInterface for KafkaStore {
fn get_storage_interface(&self) -> Box<dyn StorageInterface> {
Box::new(self.clone())
}
fn get_global_storage_interface(&self) -> Box<dyn GlobalStorageInterface> {
Box::new(self.clone())
}
}
#[async_trait::async_trait]
impl SchedulerInterface for KafkaStore {}
@ -2365,13 +2376,11 @@ impl UserInterface for KafkaStore {
self.diesel_store.delete_user_by_user_id(user_id).await
}
async fn find_users_and_roles_by_merchant_id(
async fn find_users_by_user_ids(
&self,
merchant_id: &str,
) -> CustomResult<Vec<(storage::User, user_storage::UserRole)>, errors::StorageError> {
self.diesel_store
.find_users_and_roles_by_merchant_id(merchant_id)
.await
user_ids: Vec<String>,
) -> CustomResult<Vec<storage::User>, errors::StorageError> {
self.diesel_store.find_users_by_user_ids(user_ids).await
}
}
@ -2456,6 +2465,15 @@ impl UserRoleInterface for KafkaStore {
.transfer_org_ownership_between_users(from_user_id, to_user_id, org_id)
.await
}
async fn list_user_roles_by_merchant_id(
&self,
user_id: &str,
) -> CustomResult<Vec<user_storage::UserRole>, errors::StorageError> {
self.diesel_store
.list_user_roles_by_merchant_id(user_id)
.await
}
}
#[async_trait::async_trait]

View File

@ -1,4 +1,4 @@
use diesel_models::{user as storage, user_role::UserRole};
use diesel_models::user as storage;
use error_stack::{report, ResultExt};
use masking::Secret;
use router_env::{instrument, tracing};
@ -46,10 +46,10 @@ pub trait UserInterface {
user_id: &str,
) -> CustomResult<bool, errors::StorageError>;
async fn find_users_and_roles_by_merchant_id(
async fn find_users_by_user_ids(
&self,
merchant_id: &str,
) -> CustomResult<Vec<(storage::User, UserRole)>, errors::StorageError>;
user_ids: Vec<String>,
) -> CustomResult<Vec<storage::User>, errors::StorageError>;
}
#[async_trait::async_trait]
@ -123,13 +123,12 @@ impl UserInterface for Store {
.map_err(|error| report!(errors::StorageError::from(error)))
}
#[instrument(skip_all)]
async fn find_users_and_roles_by_merchant_id(
async fn find_users_by_user_ids(
&self,
merchant_id: &str,
) -> CustomResult<Vec<(storage::User, UserRole)>, errors::StorageError> {
user_ids: Vec<String>,
) -> CustomResult<Vec<storage::User>, errors::StorageError> {
let conn = connection::pg_connection_write(self).await?;
storage::User::find_joined_users_and_roles_by_merchant_id(&conn, merchant_id)
storage::User::find_users_by_user_ids(&conn, user_ids)
.await
.map_err(|error| report!(errors::StorageError::from(error)))
}
@ -330,10 +329,10 @@ impl UserInterface for MockDb {
Ok(true)
}
async fn find_users_and_roles_by_merchant_id(
async fn find_users_by_user_ids(
&self,
_merchant_id: &str,
) -> CustomResult<Vec<(storage::User, UserRole)>, errors::StorageError> {
_user_ids: Vec<String>,
) -> CustomResult<Vec<storage::User>, errors::StorageError> {
Err(errors::StorageError::MockDbError)?
}
}

View File

@ -55,6 +55,11 @@ pub trait UserRoleInterface {
user_id: &str,
) -> CustomResult<Vec<storage::UserRole>, errors::StorageError>;
async fn list_user_roles_by_merchant_id(
&self,
merchant_id: &str,
) -> CustomResult<Vec<storage::UserRole>, errors::StorageError>;
async fn transfer_org_ownership_between_users(
&self,
from_user_id: &str,
@ -168,6 +173,17 @@ impl UserRoleInterface for Store {
.map_err(|error| report!(errors::StorageError::from(error)))
}
#[instrument(skip_all)]
async fn list_user_roles_by_merchant_id(
&self,
merchant_id: &str,
) -> CustomResult<Vec<storage::UserRole>, errors::StorageError> {
let conn = connection::pg_connection_write(self).await?;
storage::UserRole::list_by_merchant_id(&conn, merchant_id.to_owned())
.await
.map_err(|error| report!(errors::StorageError::from(error)))
}
#[instrument(skip_all)]
async fn transfer_org_ownership_between_users(
&self,
@ -492,6 +508,24 @@ impl UserRoleInterface for MockDb {
})
.collect())
}
async fn list_user_roles_by_merchant_id(
&self,
merchant_id: &str,
) -> CustomResult<Vec<storage::UserRole>, errors::StorageError> {
let user_roles = self.user_roles.lock().await;
Ok(user_roles
.iter()
.cloned()
.filter_map(|ele| {
if ele.merchant_id == merchant_id {
return Some(ele);
}
None
})
.collect())
}
}
#[cfg(feature = "kafka_events")]
@ -534,4 +568,12 @@ impl UserRoleInterface for super::KafkaStore {
) -> CustomResult<Vec<storage::UserRole>, errors::StorageError> {
self.diesel_store.list_user_roles_by_user_id(user_id).await
}
async fn list_user_roles_by_merchant_id(
&self,
merchant_id: &str,
) -> CustomResult<Vec<storage::UserRole>, errors::StorageError> {
self.diesel_store
.list_user_roles_by_merchant_id(merchant_id)
.await
}
}

View File

@ -5,6 +5,7 @@ use actix_web::{web, Scope};
use api_models::routing::RoutingRetrieveQuery;
#[cfg(feature = "olap")]
use common_enums::TransactionType;
use common_utils::consts::{DEFAULT_TENANT, GLOBAL_TENANT};
#[cfg(feature = "email")]
use external_services::email::{ses::AwsSes, EmailService};
use external_services::file_storage::FileStorageInterface;
@ -54,7 +55,7 @@ use crate::routes::verify_connector::payment_connector_verify;
pub use crate::{
configs::settings,
core::routing,
db::{StorageImpl, StorageInterface},
db::{CommonStorageInterface, GlobalStorageInterface, StorageImpl, StorageInterface},
events::EventsHandler,
routes::cards_info::card_iin_info,
services::{get_cache_store, get_store},
@ -72,6 +73,8 @@ pub struct ReqState {
#[derive(Clone)]
pub struct SessionState {
pub store: Box<dyn StorageInterface>,
/// Global store is used for global schema operations in tables like Users and Tenants
pub global_store: Box<dyn GlobalStorageInterface>,
pub conf: Arc<settings::Settings<RawSecret>>,
pub api_client: Box<dyn crate::services::ApiClient>,
pub event_handler: EventsHandler,
@ -129,6 +132,7 @@ impl SessionStateInfo for SessionState {
#[derive(Clone)]
pub struct AppState {
pub flow_name: String,
pub global_store: Box<dyn GlobalStorageInterface>,
pub stores: HashMap<String, Box<dyn StorageInterface>>,
pub conf: Arc<settings::Settings<RawSecret>>,
pub event_handler: EventsHandler,
@ -252,6 +256,24 @@ impl AppState {
let cache_store = get_cache_store(&conf.clone(), shut_down_signal, testable)
.await
.expect("Failed to create store");
let global_tenant = if conf.multitenancy.enabled {
GLOBAL_TENANT
} else {
DEFAULT_TENANT
};
let global_store: Box<dyn GlobalStorageInterface> = Self::get_store_interface(
&storage_impl,
&event_handler,
&conf,
&settings::GlobalTenant {
schema: global_tenant.to_string(),
redis_key_prefix: String::default(),
},
Arc::clone(&cache_store),
testable,
)
.await
.get_global_storage_interface();
for (tenant_name, tenant) in conf.clone().multitenancy.get_tenants() {
let store: Box<dyn StorageInterface> = Self::get_store_interface(
&storage_impl,
@ -261,7 +283,8 @@ impl AppState {
Arc::clone(&cache_store),
testable,
)
.await;
.await
.get_storage_interface();
stores.insert(tenant_name.clone(), store);
#[cfg(feature = "olap")]
let pool =
@ -279,6 +302,7 @@ impl AppState {
Self {
flow_name: String::from("default"),
stores,
global_store,
conf: Arc::new(conf),
#[cfg(feature = "email")]
email_client,
@ -300,10 +324,10 @@ impl AppState {
storage_impl: &StorageImpl,
event_handler: &EventsHandler,
conf: &Settings,
tenant: &settings::Tenant,
tenant: &dyn TenantConfig,
cache_store: Arc<RedisStore>,
testable: bool,
) -> Box<dyn StorageInterface> {
) -> Box<dyn CommonStorageInterface> {
match storage_impl {
StorageImpl::Postgresql | StorageImpl::PostgresqlTest => match event_handler {
EventsHandler::Kafka(kafka_client) => Box::new(
@ -358,6 +382,7 @@ impl AppState {
{
Ok(SessionState {
store: self.stores.get(tenant).ok_or_else(err)?.clone(),
global_store: self.global_store.clone(),
conf: Arc::clone(&self.conf),
api_client: self.api_client.clone(),
event_handler: self.event_handler.clone(),

View File

@ -73,8 +73,9 @@ pub async fn send_recon_request(
state: SessionState,
user: UserFromToken,
) -> RouterResponse<recon_api::ReconStatusResponse> {
let global_db = &*state.global_store;
let db = &*state.store;
let user_from_db = db
let user_from_db = global_db
.find_user_by_id(&user.user_id)
.await
.change_context(errors::ApiErrorResponse::InternalServerError)?;
@ -220,7 +221,7 @@ pub async fn generate_recon_token(
state: SessionState,
req: ReconUser,
) -> RouterResponse<recon_api::ReconTokenResponse> {
let db = &*state.store;
let db = &*state.global_store;
let user = db
.find_user_by_id(&req.user_id)
.await

View File

@ -20,7 +20,7 @@ use hyperswitch_domain_models::errors::StorageResult;
use masking::{ExposeInterface, StrongSecret};
#[cfg(feature = "kv_store")]
use storage_impl::KVRouterStore;
use storage_impl::{redis::RedisStore, RouterStore};
use storage_impl::{config::TenantConfig, redis::RedisStore, RouterStore};
use tokio::sync::oneshot;
pub use self::{api::*, encryption::*};
@ -42,7 +42,7 @@ pub type Store = KVRouterStore<StoreType>;
#[allow(clippy::expect_used)]
pub async fn get_store(
config: &Settings,
tenant: &crate::configs::settings::Tenant,
tenant: &dyn TenantConfig,
cache_store: Arc<RedisStore>,
test_transaction: bool,
) -> StorageResult<Store> {

View File

@ -29,7 +29,7 @@ use crate::{
admin,
errors::{self, UserErrors, UserResult},
},
db::StorageInterface,
db::GlobalStorageInterface,
routes::SessionState,
services::{self, authentication as auth, authentication::UserFromToken, authorization::info},
types::transformers::ForeignFrom,
@ -545,7 +545,7 @@ impl NewUser {
pub async fn insert_user_in_db(
&self,
db: &dyn StorageInterface,
db: &dyn GlobalStorageInterface,
) -> UserResult<UserFromStorage> {
match db.insert_user(self.clone().try_into()?).await {
Ok(user) => Ok(user.into()),
@ -562,7 +562,7 @@ impl NewUser {
pub async fn check_if_already_exists_in_db(&self, state: SessionState) -> UserResult<()> {
if state
.store
.global_store
.find_user_by_email(&self.get_email().into_inner())
.await
.is_ok()
@ -577,7 +577,7 @@ impl NewUser {
state: SessionState,
) -> UserResult<UserFromStorage> {
self.check_if_already_exists_in_db(state.clone()).await?;
let db = state.store.as_ref();
let db = state.global_store.as_ref();
let merchant_id = self.get_new_merchant().get_merchant_id();
self.new_merchant
.create_new_merchant_and_insert_in_db(state.clone())
@ -898,7 +898,7 @@ impl UserFromStorage {
pub async fn get_or_create_key_store(&self, state: &SessionState) -> UserResult<UserKeyStore> {
let master_key = state.store.get_master_key();
let key_store_result = state
.store
.global_store
.get_user_key_store_by_user_id(self.get_user_id(), &master_key.to_vec().into())
.await;
@ -922,7 +922,7 @@ impl UserFromStorage {
created_at: common_utils::date_time::now(),
};
state
.store
.global_store
.insert_user_key_store(key_store, &master_key.to_vec().into())
.await
.change_context(UserErrors::InternalServerError)
@ -951,7 +951,7 @@ impl UserFromStorage {
}
let user_key_store = state
.store
.global_store
.get_user_key_store_by_user_id(
self.get_user_id(),
&state.store.get_master_key().to_vec().into(),

View File

@ -57,7 +57,7 @@ impl UserFromToken {
pub async fn get_user_from_db(&self, state: &SessionState) -> UserResult<UserFromStorage> {
let user = state
.store
.global_store
.find_user_by_id(&self.user_id)
.await
.change_context(UserErrors::InternalServerError)?;
@ -180,7 +180,7 @@ pub async fn get_user_from_db_by_email(
email: domain::UserEmail,
) -> CustomResult<UserFromStorage, StorageError> {
state
.store
.global_store
.find_user_by_email(&email.into_inner())
.await
.map(UserFromStorage::from)