mirror of
https://github.com/juspay/hyperswitch.git
synced 2025-10-29 17:19:15 +08:00
refactor(tenant): use tenant id type (#6643)
Co-authored-by: hyperswitch-bot[bot] <148525504+hyperswitch-bot[bot]@users.noreply.github.com>
This commit is contained in:
@ -12,6 +12,7 @@ mod payment;
|
||||
mod profile;
|
||||
mod refunds;
|
||||
mod routing;
|
||||
mod tenant;
|
||||
|
||||
#[cfg(feature = "v2")]
|
||||
mod global_id;
|
||||
@ -40,6 +41,7 @@ pub use profile::ProfileId;
|
||||
pub use refunds::RefundReferenceId;
|
||||
pub use routing::RoutingId;
|
||||
use serde::{Deserialize, Serialize};
|
||||
pub use tenant::TenantId;
|
||||
use thiserror::Error;
|
||||
|
||||
use crate::{fp_utils::when, generate_id_with_default_len};
|
||||
|
||||
@ -18,7 +18,7 @@ crate::impl_to_sql_from_sql_id_type!(OrganizationId);
|
||||
|
||||
impl OrganizationId {
|
||||
/// Get an organization id from String
|
||||
pub fn wrap(org_id: String) -> CustomResult<Self, ValidationError> {
|
||||
pub fn try_from_string(org_id: String) -> CustomResult<Self, ValidationError> {
|
||||
Self::try_from(std::borrow::Cow::from(org_id))
|
||||
}
|
||||
}
|
||||
|
||||
22
crates/common_utils/src/id_type/tenant.rs
Normal file
22
crates/common_utils/src/id_type/tenant.rs
Normal file
@ -0,0 +1,22 @@
|
||||
use crate::errors::{CustomResult, ValidationError};
|
||||
|
||||
crate::id_type!(
|
||||
TenantId,
|
||||
"A type for tenant_id that can be used for unique identifier for a tenant"
|
||||
);
|
||||
crate::impl_id_type_methods!(TenantId, "tenant_id");
|
||||
|
||||
// This is to display the `TenantId` as TenantId(abcd)
|
||||
crate::impl_debug_id_type!(TenantId);
|
||||
crate::impl_try_from_cow_str_id_type!(TenantId, "tenant_id");
|
||||
|
||||
crate::impl_serializable_secret_id_type!(TenantId);
|
||||
crate::impl_queryable_id_type!(TenantId);
|
||||
crate::impl_to_sql_from_sql_id_type!(TenantId);
|
||||
|
||||
impl TenantId {
|
||||
/// Get tenant id from String
|
||||
pub fn try_from_string(tenant_id: String) -> CustomResult<Self, ValidationError> {
|
||||
Self::try_from(std::borrow::Cow::from(tenant_id))
|
||||
}
|
||||
}
|
||||
@ -12,15 +12,15 @@ pub enum ThemeLineage {
|
||||
// },
|
||||
/// Org lineage variant
|
||||
Organization {
|
||||
/// tenant_id: String
|
||||
tenant_id: String,
|
||||
/// tenant_id: TenantId
|
||||
tenant_id: id_type::TenantId,
|
||||
/// org_id: OrganizationId
|
||||
org_id: id_type::OrganizationId,
|
||||
},
|
||||
/// Merchant lineage variant
|
||||
Merchant {
|
||||
/// tenant_id: String
|
||||
tenant_id: String,
|
||||
/// tenant_id: TenantId
|
||||
tenant_id: id_type::TenantId,
|
||||
/// org_id: OrganizationId
|
||||
org_id: id_type::OrganizationId,
|
||||
/// merchant_id: MerchantId
|
||||
@ -28,8 +28,8 @@ pub enum ThemeLineage {
|
||||
},
|
||||
/// Profile lineage variant
|
||||
Profile {
|
||||
/// tenant_id: String
|
||||
tenant_id: String,
|
||||
/// tenant_id: TenantId
|
||||
tenant_id: id_type::TenantId,
|
||||
/// org_id: OrganizationId
|
||||
org_id: id_type::OrganizationId,
|
||||
/// merchant_id: MerchantId
|
||||
|
||||
@ -9,7 +9,7 @@ use crate::schema::themes;
|
||||
#[diesel(table_name = themes, primary_key(theme_id), check_for_backend(diesel::pg::Pg))]
|
||||
pub struct Theme {
|
||||
pub theme_id: String,
|
||||
pub tenant_id: String,
|
||||
pub tenant_id: id_type::TenantId,
|
||||
pub org_id: Option<id_type::OrganizationId>,
|
||||
pub merchant_id: Option<id_type::MerchantId>,
|
||||
pub profile_id: Option<id_type::ProfileId>,
|
||||
@ -23,7 +23,7 @@ pub struct Theme {
|
||||
#[diesel(table_name = themes)]
|
||||
pub struct ThemeNew {
|
||||
pub theme_id: String,
|
||||
pub tenant_id: String,
|
||||
pub tenant_id: id_type::TenantId,
|
||||
pub org_id: Option<id_type::OrganizationId>,
|
||||
pub merchant_id: Option<id_type::MerchantId>,
|
||||
pub profile_id: Option<id_type::ProfileId>,
|
||||
|
||||
@ -24,7 +24,7 @@ pub struct UserRole {
|
||||
pub entity_id: Option<String>,
|
||||
pub entity_type: Option<EntityType>,
|
||||
pub version: enums::UserRoleVersion,
|
||||
pub tenant_id: String,
|
||||
pub tenant_id: id_type::TenantId,
|
||||
}
|
||||
|
||||
impl UserRole {
|
||||
@ -88,7 +88,7 @@ pub struct UserRoleNew {
|
||||
pub entity_id: Option<String>,
|
||||
pub entity_type: Option<EntityType>,
|
||||
pub version: enums::UserRoleVersion,
|
||||
pub tenant_id: String,
|
||||
pub tenant_id: id_type::TenantId,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, AsChangeset, router_derive::DebugAsDisplay)]
|
||||
|
||||
@ -3,6 +3,7 @@ use std::{
|
||||
sync::{atomic, Arc},
|
||||
};
|
||||
|
||||
use common_utils::id_type;
|
||||
use router_env::tracing::Instrument;
|
||||
use tokio::{
|
||||
sync::{mpsc, oneshot},
|
||||
@ -34,12 +35,15 @@ pub struct HandlerInner {
|
||||
loop_interval: Duration,
|
||||
active_tasks: Arc<atomic::AtomicU64>,
|
||||
conf: DrainerSettings,
|
||||
stores: HashMap<String, Arc<Store>>,
|
||||
stores: HashMap<id_type::TenantId, Arc<Store>>,
|
||||
running: Arc<atomic::AtomicBool>,
|
||||
}
|
||||
|
||||
impl Handler {
|
||||
pub fn from_conf(conf: DrainerSettings, stores: HashMap<String, Arc<Store>>) -> Self {
|
||||
pub fn from_conf(
|
||||
conf: DrainerSettings,
|
||||
stores: HashMap<id_type::TenantId, Arc<Store>>,
|
||||
) -> Self {
|
||||
let shutdown_interval = Duration::from_millis(conf.shutdown_interval.into());
|
||||
let loop_interval = Duration::from_millis(conf.loop_interval.into());
|
||||
|
||||
|
||||
@ -2,7 +2,7 @@ use std::{collections::HashMap, sync::Arc};
|
||||
|
||||
use actix_web::{web, Scope};
|
||||
use async_bb8_diesel::{AsyncConnection, AsyncRunQueryDsl};
|
||||
use common_utils::errors::CustomResult;
|
||||
use common_utils::{errors::CustomResult, id_type};
|
||||
use diesel_models::{Config, ConfigNew};
|
||||
use error_stack::ResultExt;
|
||||
use router_env::{instrument, logger, tracing};
|
||||
@ -20,7 +20,7 @@ pub const TEST_STREAM_DATA: &[(&str, &str)] = &[("data", "sample_data")];
|
||||
pub struct Health;
|
||||
|
||||
impl Health {
|
||||
pub fn server(conf: Settings, stores: HashMap<String, Arc<Store>>) -> Scope {
|
||||
pub fn server(conf: Settings, stores: HashMap<id_type::TenantId, Arc<Store>>) -> Scope {
|
||||
web::scope("health")
|
||||
.app_data(web::Data::new(conf))
|
||||
.app_data(web::Data::new(stores))
|
||||
|
||||
@ -14,7 +14,7 @@ use std::{collections::HashMap, sync::Arc};
|
||||
mod secrets_transformers;
|
||||
|
||||
use actix_web::dev::Server;
|
||||
use common_utils::signals::get_allowed_signals;
|
||||
use common_utils::{id_type, signals::get_allowed_signals};
|
||||
use diesel_models::kv;
|
||||
use error_stack::ResultExt;
|
||||
use hyperswitch_interfaces::secrets_interface::secret_state::RawSecret;
|
||||
@ -31,7 +31,7 @@ use crate::{
|
||||
};
|
||||
|
||||
pub async fn start_drainer(
|
||||
stores: HashMap<String, Arc<Store>>,
|
||||
stores: HashMap<id_type::TenantId, Arc<Store>>,
|
||||
conf: DrainerSettings,
|
||||
) -> errors::DrainerResult<()> {
|
||||
let drainer_handler = handler::Handler::from_conf(conf, stores);
|
||||
@ -62,7 +62,7 @@ pub async fn start_drainer(
|
||||
|
||||
pub async fn start_web_server(
|
||||
conf: Settings,
|
||||
stores: HashMap<String, Arc<Store>>,
|
||||
stores: HashMap<id_type::TenantId, Arc<Store>>,
|
||||
) -> Result<Server, errors::DrainerError> {
|
||||
let server = conf.server.clone();
|
||||
let web_server = actix_web::HttpServer::new(move || {
|
||||
|
||||
@ -1,6 +1,6 @@
|
||||
use std::{collections::HashMap, path::PathBuf, sync::Arc};
|
||||
|
||||
use common_utils::{ext_traits::ConfigExt, DbConnectionParams};
|
||||
use common_utils::{ext_traits::ConfigExt, id_type, DbConnectionParams};
|
||||
use config::{Environment, File};
|
||||
use external_services::managers::{
|
||||
encryption_management::EncryptionManagementConfig, secrets_management::SecretsManagementConfig,
|
||||
@ -122,23 +122,23 @@ pub struct Multitenancy {
|
||||
pub tenants: TenantConfig,
|
||||
}
|
||||
impl Multitenancy {
|
||||
pub fn get_tenants(&self) -> &HashMap<String, Tenant> {
|
||||
pub fn get_tenants(&self) -> &HashMap<id_type::TenantId, Tenant> {
|
||||
&self.tenants.0
|
||||
}
|
||||
pub fn get_tenant_ids(&self) -> Vec<String> {
|
||||
pub fn get_tenant_ids(&self) -> Vec<id_type::TenantId> {
|
||||
self.tenants
|
||||
.0
|
||||
.values()
|
||||
.map(|tenant| tenant.tenant_id.clone())
|
||||
.collect()
|
||||
}
|
||||
pub fn get_tenant(&self, tenant_id: &str) -> Option<&Tenant> {
|
||||
pub fn get_tenant(&self, tenant_id: &id_type::TenantId) -> Option<&Tenant> {
|
||||
self.tenants.0.get(tenant_id)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Default)]
|
||||
pub struct TenantConfig(pub HashMap<String, Tenant>);
|
||||
pub struct TenantConfig(pub HashMap<id_type::TenantId, Tenant>);
|
||||
|
||||
impl<'de> Deserialize<'de> for TenantConfig {
|
||||
fn deserialize<D: serde::Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
|
||||
@ -150,7 +150,7 @@ impl<'de> Deserialize<'de> for TenantConfig {
|
||||
clickhouse_database: String,
|
||||
}
|
||||
|
||||
let hashmap = <HashMap<String, Inner>>::deserialize(deserializer)?;
|
||||
let hashmap = <HashMap<id_type::TenantId, Inner>>::deserialize(deserializer)?;
|
||||
|
||||
Ok(Self(
|
||||
hashmap
|
||||
@ -172,9 +172,9 @@ impl<'de> Deserialize<'de> for TenantConfig {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize, Clone, Default)]
|
||||
#[derive(Debug, Deserialize, Clone)]
|
||||
pub struct Tenant {
|
||||
pub tenant_id: String,
|
||||
pub tenant_id: id_type::TenantId,
|
||||
pub base_url: String,
|
||||
pub schema: String,
|
||||
pub redis_key_prefix: String,
|
||||
|
||||
@ -6,7 +6,7 @@ use std::{
|
||||
#[cfg(feature = "olap")]
|
||||
use analytics::{opensearch::OpenSearchConfig, ReportConfig};
|
||||
use api_models::{enums, payment_methods::RequiredFieldInfo};
|
||||
use common_utils::ext_traits::ConfigExt;
|
||||
use common_utils::{ext_traits::ConfigExt, id_type};
|
||||
use config::{Environment, File};
|
||||
use error_stack::ResultExt;
|
||||
#[cfg(feature = "email")]
|
||||
@ -138,17 +138,17 @@ pub struct Multitenancy {
|
||||
}
|
||||
|
||||
impl Multitenancy {
|
||||
pub fn get_tenants(&self) -> &HashMap<String, Tenant> {
|
||||
pub fn get_tenants(&self) -> &HashMap<id_type::TenantId, Tenant> {
|
||||
&self.tenants.0
|
||||
}
|
||||
pub fn get_tenant_ids(&self) -> Vec<String> {
|
||||
pub fn get_tenant_ids(&self) -> Vec<id_type::TenantId> {
|
||||
self.tenants
|
||||
.0
|
||||
.values()
|
||||
.map(|tenant| tenant.tenant_id.clone())
|
||||
.collect()
|
||||
}
|
||||
pub fn get_tenant(&self, tenant_id: &str) -> Option<&Tenant> {
|
||||
pub fn get_tenant(&self, tenant_id: &id_type::TenantId) -> Option<&Tenant> {
|
||||
self.tenants.0.get(tenant_id)
|
||||
}
|
||||
}
|
||||
@ -159,11 +159,11 @@ pub struct DecisionConfig {
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Default)]
|
||||
pub struct TenantConfig(pub HashMap<String, Tenant>);
|
||||
pub struct TenantConfig(pub HashMap<id_type::TenantId, Tenant>);
|
||||
|
||||
#[derive(Debug, Clone, Default)]
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Tenant {
|
||||
pub tenant_id: String,
|
||||
pub tenant_id: id_type::TenantId,
|
||||
pub base_url: String,
|
||||
pub schema: String,
|
||||
pub redis_key_prefix: String,
|
||||
@ -743,8 +743,7 @@ pub struct LockerBasedRecipientConnectorList {
|
||||
|
||||
#[derive(Debug, Deserialize, Clone, Default)]
|
||||
pub struct ConnectorRequestReferenceIdConfig {
|
||||
pub merchant_ids_send_payment_id_as_connector_request_id:
|
||||
HashSet<common_utils::id_type::MerchantId>,
|
||||
pub merchant_ids_send_payment_id_as_connector_request_id: HashSet<id_type::MerchantId>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize, Clone, Default)]
|
||||
@ -970,7 +969,7 @@ pub struct ServerTls {
|
||||
#[cfg(feature = "v2")]
|
||||
#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
|
||||
pub struct CellInformation {
|
||||
pub id: common_utils::id_type::CellId,
|
||||
pub id: id_type::CellId,
|
||||
}
|
||||
|
||||
#[cfg(feature = "v2")]
|
||||
@ -981,8 +980,8 @@ impl Default for CellInformation {
|
||||
// around the time of deserializing application settings.
|
||||
// And a panic at application startup is considered acceptable.
|
||||
#[allow(clippy::expect_used)]
|
||||
let cell_id = common_utils::id_type::CellId::from_string("defid")
|
||||
.expect("Failed to create a default for Cell Id");
|
||||
let cell_id =
|
||||
id_type::CellId::from_string("defid").expect("Failed to create a default for Cell Id");
|
||||
Self { id: cell_id }
|
||||
}
|
||||
}
|
||||
@ -1120,7 +1119,7 @@ impl<'de> Deserialize<'de> for TenantConfig {
|
||||
clickhouse_database: String,
|
||||
}
|
||||
|
||||
let hashmap = <HashMap<String, Inner>>::deserialize(deserializer)?;
|
||||
let hashmap = <HashMap<id_type::TenantId, Inner>>::deserialize(deserializer)?;
|
||||
|
||||
Ok(Self(
|
||||
hashmap
|
||||
|
||||
@ -389,7 +389,7 @@ pub async fn mk_add_locker_request_hs(
|
||||
locker: &settings::Locker,
|
||||
payload: &StoreLockerReq,
|
||||
locker_choice: api_enums::LockerChoice,
|
||||
tenant_id: String,
|
||||
tenant_id: id_type::TenantId,
|
||||
request_id: Option<RequestId>,
|
||||
) -> CustomResult<services::Request, errors::VaultError> {
|
||||
let payload = payload
|
||||
@ -409,7 +409,10 @@ pub async fn mk_add_locker_request_hs(
|
||||
url.push_str("/cards/add");
|
||||
let mut request = services::Request::new(services::Method::Post, &url);
|
||||
request.add_header(headers::CONTENT_TYPE, "application/json".into());
|
||||
request.add_header(headers::X_TENANT_ID, tenant_id.into());
|
||||
request.add_header(
|
||||
headers::X_TENANT_ID,
|
||||
tenant_id.get_string_repr().to_owned().into(),
|
||||
);
|
||||
if let Some(req_id) = request_id {
|
||||
request.add_header(
|
||||
headers::X_REQUEST_ID,
|
||||
@ -584,7 +587,7 @@ pub async fn mk_get_card_request_hs(
|
||||
merchant_id: &id_type::MerchantId,
|
||||
card_reference: &str,
|
||||
locker_choice: Option<api_enums::LockerChoice>,
|
||||
tenant_id: String,
|
||||
tenant_id: id_type::TenantId,
|
||||
request_id: Option<RequestId>,
|
||||
) -> CustomResult<services::Request, errors::VaultError> {
|
||||
let merchant_customer_id = customer_id.to_owned();
|
||||
@ -612,7 +615,10 @@ pub async fn mk_get_card_request_hs(
|
||||
url.push_str("/cards/retrieve");
|
||||
let mut request = services::Request::new(services::Method::Post, &url);
|
||||
request.add_header(headers::CONTENT_TYPE, "application/json".into());
|
||||
request.add_header(headers::X_TENANT_ID, tenant_id.into());
|
||||
request.add_header(
|
||||
headers::X_TENANT_ID,
|
||||
tenant_id.get_string_repr().to_owned().into(),
|
||||
);
|
||||
if let Some(req_id) = request_id {
|
||||
request.add_header(
|
||||
headers::X_REQUEST_ID,
|
||||
@ -665,7 +671,7 @@ pub async fn mk_delete_card_request_hs(
|
||||
customer_id: &id_type::CustomerId,
|
||||
merchant_id: &id_type::MerchantId,
|
||||
card_reference: &str,
|
||||
tenant_id: String,
|
||||
tenant_id: id_type::TenantId,
|
||||
request_id: Option<RequestId>,
|
||||
) -> CustomResult<services::Request, errors::VaultError> {
|
||||
let merchant_customer_id = customer_id.to_owned();
|
||||
@ -691,7 +697,10 @@ pub async fn mk_delete_card_request_hs(
|
||||
url.push_str("/cards/delete");
|
||||
let mut request = services::Request::new(services::Method::Post, &url);
|
||||
request.add_header(headers::CONTENT_TYPE, "application/json".into());
|
||||
request.add_header(headers::X_TENANT_ID, tenant_id.into());
|
||||
request.add_header(
|
||||
headers::X_TENANT_ID,
|
||||
tenant_id.get_string_repr().to_owned().into(),
|
||||
);
|
||||
if let Some(req_id) = request_id {
|
||||
request.add_header(
|
||||
headers::X_REQUEST_ID,
|
||||
@ -711,7 +720,7 @@ pub async fn mk_delete_card_request_hs_by_id(
|
||||
id: &String,
|
||||
merchant_id: &id_type::MerchantId,
|
||||
card_reference: &str,
|
||||
tenant_id: String,
|
||||
tenant_id: id_type::TenantId,
|
||||
request_id: Option<RequestId>,
|
||||
) -> CustomResult<services::Request, errors::VaultError> {
|
||||
let merchant_customer_id = id.to_owned();
|
||||
@ -737,7 +746,10 @@ pub async fn mk_delete_card_request_hs_by_id(
|
||||
url.push_str("/cards/delete");
|
||||
let mut request = services::Request::new(services::Method::Post, &url);
|
||||
request.add_header(headers::CONTENT_TYPE, "application/json".into());
|
||||
request.add_header(headers::X_TENANT_ID, tenant_id.into());
|
||||
request.add_header(
|
||||
headers::X_TENANT_ID,
|
||||
tenant_id.get_string_repr().to_owned().into(),
|
||||
);
|
||||
if let Some(req_id) = request_id {
|
||||
request.add_header(
|
||||
headers::X_REQUEST_ID,
|
||||
@ -832,7 +844,7 @@ pub fn mk_crud_locker_request(
|
||||
locker: &settings::Locker,
|
||||
path: &str,
|
||||
req: api::TokenizePayloadEncrypted,
|
||||
tenant_id: String,
|
||||
tenant_id: id_type::TenantId,
|
||||
request_id: Option<RequestId>,
|
||||
) -> CustomResult<services::Request, errors::VaultError> {
|
||||
let mut url = locker.basilisk_host.to_owned();
|
||||
@ -840,7 +852,10 @@ pub fn mk_crud_locker_request(
|
||||
let mut request = services::Request::new(services::Method::Post, &url);
|
||||
request.add_default_headers();
|
||||
request.add_header(headers::CONTENT_TYPE, "application/json".into());
|
||||
request.add_header(headers::X_TENANT_ID, tenant_id.into());
|
||||
request.add_header(
|
||||
headers::X_TENANT_ID,
|
||||
tenant_id.get_string_repr().to_owned().into(),
|
||||
);
|
||||
if let Some(req_id) = request_id {
|
||||
request.add_header(
|
||||
headers::X_REQUEST_ID,
|
||||
|
||||
@ -751,7 +751,10 @@ pub async fn push_metrics_with_update_window_for_success_based_routing(
|
||||
&metrics::CONTEXT,
|
||||
1,
|
||||
&add_attributes([
|
||||
("tenant", state.tenant.tenant_id.clone()),
|
||||
(
|
||||
"tenant",
|
||||
state.tenant.tenant_id.get_string_repr().to_owned(),
|
||||
),
|
||||
(
|
||||
"merchant_profile_id",
|
||||
format!(
|
||||
|
||||
@ -1120,11 +1120,15 @@ pub async fn create_internal_user(
|
||||
}
|
||||
})?;
|
||||
|
||||
let default_tenant_id = common_utils::consts::DEFAULT_TENANT.to_string();
|
||||
let default_tenant_id = common_utils::id_type::TenantId::try_from_string(
|
||||
common_utils::consts::DEFAULT_TENANT.to_owned(),
|
||||
)
|
||||
.change_context(UserErrors::InternalServerError)
|
||||
.attach_printable("Unable to parse default tenant id")?;
|
||||
|
||||
if state.tenant.tenant_id != default_tenant_id {
|
||||
return Err(UserErrors::ForbiddenTenantId)
|
||||
.attach_printable("Operation allowed only for the default tenant.");
|
||||
.attach_printable("Operation allowed only for the default tenant");
|
||||
}
|
||||
|
||||
let internal_merchant_id = common_utils::id_type::MerchantId::get_internal_user_merchant_id(
|
||||
|
||||
@ -732,7 +732,10 @@ mod tests {
|
||||
))
|
||||
.await;
|
||||
let state = &Arc::new(app_state)
|
||||
.get_session_state("public", || {})
|
||||
.get_session_state(
|
||||
&common_utils::id_type::TenantId::try_from_string("public".to_string()).unwrap(),
|
||||
|| {},
|
||||
)
|
||||
.unwrap();
|
||||
let merchant_id =
|
||||
common_utils::id_type::MerchantId::try_from(std::borrow::Cow::from("merchant_1"))
|
||||
|
||||
@ -1502,8 +1502,12 @@ mod merchant_connector_account_cache_tests {
|
||||
Box::new(services::MockApiClient),
|
||||
))
|
||||
.await;
|
||||
|
||||
let state = &Arc::new(app_state)
|
||||
.get_session_state("public", || {})
|
||||
.get_session_state(
|
||||
&common_utils::id_type::TenantId::try_from_string("public".to_string()).unwrap(),
|
||||
|| {},
|
||||
)
|
||||
.unwrap();
|
||||
#[allow(clippy::expect_used)]
|
||||
let db = MockDb::new(&redis_interface::RedisSettings::default())
|
||||
@ -1685,7 +1689,10 @@ mod merchant_connector_account_cache_tests {
|
||||
))
|
||||
.await;
|
||||
let state = &Arc::new(app_state)
|
||||
.get_session_state("public", || {})
|
||||
.get_session_state(
|
||||
&common_utils::id_type::TenantId::try_from_string("public".to_string()).unwrap(),
|
||||
|| {},
|
||||
)
|
||||
.unwrap();
|
||||
#[allow(clippy::expect_used)]
|
||||
let db = MockDb::new(&redis_interface::RedisSettings::default())
|
||||
|
||||
@ -348,7 +348,10 @@ mod tests {
|
||||
))
|
||||
.await;
|
||||
let state = &Arc::new(app_state)
|
||||
.get_session_state("public", || {})
|
||||
.get_session_state(
|
||||
&common_utils::id_type::TenantId::try_from_string("public".to_string()).unwrap(),
|
||||
|| {},
|
||||
)
|
||||
.unwrap();
|
||||
#[allow(clippy::expect_used)]
|
||||
let mock_db = MockDb::new(&redis_interface::RedisSettings::default())
|
||||
|
||||
@ -7,6 +7,7 @@ use api_models::routing::RoutingRetrieveQuery;
|
||||
use common_enums::TransactionType;
|
||||
#[cfg(feature = "partial-auth")]
|
||||
use common_utils::crypto::Blake3;
|
||||
use common_utils::id_type;
|
||||
#[cfg(feature = "email")]
|
||||
use external_services::email::{
|
||||
no_email::NoEmailClient, ses::AwsSes, smtp::SmtpServer, EmailClientConfigs, EmailService,
|
||||
@ -193,14 +194,14 @@ impl SessionStateInfo for SessionState {
|
||||
pub struct AppState {
|
||||
pub flow_name: String,
|
||||
pub global_store: Box<dyn GlobalStorageInterface>,
|
||||
pub stores: HashMap<String, Box<dyn StorageInterface>>,
|
||||
pub stores: HashMap<id_type::TenantId, Box<dyn StorageInterface>>,
|
||||
pub conf: Arc<settings::Settings<RawSecret>>,
|
||||
pub event_handler: EventsHandler,
|
||||
#[cfg(feature = "email")]
|
||||
pub email_client: Arc<Box<dyn EmailService>>,
|
||||
pub api_client: Box<dyn crate::services::ApiClient>,
|
||||
#[cfg(feature = "olap")]
|
||||
pub pools: HashMap<String, AnalyticsProvider>,
|
||||
pub pools: HashMap<id_type::TenantId, AnalyticsProvider>,
|
||||
#[cfg(feature = "olap")]
|
||||
pub opensearch_client: Arc<OpenSearchClient>,
|
||||
pub request_id: Option<RequestId>,
|
||||
@ -209,7 +210,7 @@ pub struct AppState {
|
||||
pub grpc_client: Arc<GrpcClients>,
|
||||
}
|
||||
impl scheduler::SchedulerAppState for AppState {
|
||||
fn get_tenants(&self) -> Vec<String> {
|
||||
fn get_tenants(&self) -> Vec<id_type::TenantId> {
|
||||
self.conf.multitenancy.get_tenant_ids()
|
||||
}
|
||||
}
|
||||
@ -328,7 +329,7 @@ impl AppState {
|
||||
);
|
||||
|
||||
#[cfg(feature = "olap")]
|
||||
let mut pools: HashMap<String, AnalyticsProvider> = HashMap::new();
|
||||
let mut pools: HashMap<id_type::TenantId, AnalyticsProvider> = HashMap::new();
|
||||
let mut stores = HashMap::new();
|
||||
#[allow(clippy::expect_used)]
|
||||
let cache_store = get_cache_store(&conf.clone(), shut_down_signal, testable)
|
||||
@ -443,7 +444,11 @@ impl AppState {
|
||||
.await
|
||||
}
|
||||
|
||||
pub fn get_session_state<E, F>(self: Arc<Self>, tenant: &str, err: F) -> Result<SessionState, E>
|
||||
pub fn get_session_state<E, F>(
|
||||
self: Arc<Self>,
|
||||
tenant: &id_type::TenantId,
|
||||
err: F,
|
||||
) -> Result<SessionState, E>
|
||||
where
|
||||
F: FnOnce() -> E + Copy,
|
||||
{
|
||||
|
||||
@ -68,7 +68,7 @@ use crate::{
|
||||
api_logs::{ApiEvent, ApiEventMetric, ApiEventsType},
|
||||
connector_api_logs::ConnectorEvent,
|
||||
},
|
||||
logger,
|
||||
headers, logger,
|
||||
routes::{
|
||||
app::{AppStateInfo, ReqState, SessionStateInfo},
|
||||
metrics, AppState, SessionState,
|
||||
@ -722,30 +722,41 @@ where
|
||||
|
||||
let mut event_type = payload.get_api_event_type();
|
||||
let tenant_id = if !state.conf.multitenancy.enabled {
|
||||
DEFAULT_TENANT.to_string()
|
||||
common_utils::id_type::TenantId::try_from_string(DEFAULT_TENANT.to_owned())
|
||||
.attach_printable("Unable to get default tenant id")
|
||||
.change_context(errors::ApiErrorResponse::InternalServerError.switch())?
|
||||
} else {
|
||||
let request_tenant_id = incoming_request_header
|
||||
.get(TENANT_HEADER)
|
||||
.and_then(|value| value.to_str().ok())
|
||||
.ok_or_else(|| errors::ApiErrorResponse::MissingTenantId.switch())?;
|
||||
.ok_or_else(|| errors::ApiErrorResponse::MissingTenantId.switch())
|
||||
.and_then(|header_value| {
|
||||
common_utils::id_type::TenantId::try_from_string(header_value.to_string()).map_err(
|
||||
|_| {
|
||||
errors::ApiErrorResponse::InvalidRequestData {
|
||||
message: format!("`{}` header is invalid", headers::X_TENANT_ID),
|
||||
}
|
||||
.switch()
|
||||
},
|
||||
)
|
||||
})?;
|
||||
|
||||
state
|
||||
.conf
|
||||
.multitenancy
|
||||
.get_tenant(request_tenant_id)
|
||||
.get_tenant(&request_tenant_id)
|
||||
.map(|tenant| tenant.tenant_id.clone())
|
||||
.ok_or(
|
||||
errors::ApiErrorResponse::InvalidTenant {
|
||||
tenant_id: request_tenant_id.to_string(),
|
||||
tenant_id: request_tenant_id.get_string_repr().to_string(),
|
||||
}
|
||||
.switch(),
|
||||
)?
|
||||
};
|
||||
|
||||
let mut session_state =
|
||||
Arc::new(app_state.clone()).get_session_state(tenant_id.as_str(), || {
|
||||
let mut session_state = Arc::new(app_state.clone()).get_session_state(&tenant_id, || {
|
||||
errors::ApiErrorResponse::InvalidTenant {
|
||||
tenant_id: tenant_id.clone(),
|
||||
tenant_id: tenant_id.get_string_repr().to_string(),
|
||||
}
|
||||
.switch()
|
||||
})?;
|
||||
@ -757,9 +768,10 @@ where
|
||||
.event_context
|
||||
.record_info(("flow".to_string(), flow.to_string()));
|
||||
|
||||
request_state
|
||||
.event_context
|
||||
.record_info(("tenant_id".to_string(), tenant_id.to_string()));
|
||||
request_state.event_context.record_info((
|
||||
"tenant_id".to_string(),
|
||||
tenant_id.get_string_repr().to_string(),
|
||||
));
|
||||
|
||||
// Currently auth failures are not recorded as API events
|
||||
let (auth_out, auth_type) = api_auth
|
||||
|
||||
@ -185,7 +185,7 @@ pub struct UserFromSinglePurposeToken {
|
||||
pub user_id: String,
|
||||
pub origin: domain::Origin,
|
||||
pub path: Vec<TokenPurpose>,
|
||||
pub tenant_id: Option<String>,
|
||||
pub tenant_id: Option<id_type::TenantId>,
|
||||
}
|
||||
|
||||
#[cfg(feature = "olap")]
|
||||
@ -196,7 +196,7 @@ pub struct SinglePurposeToken {
|
||||
pub origin: domain::Origin,
|
||||
pub path: Vec<TokenPurpose>,
|
||||
pub exp: u64,
|
||||
pub tenant_id: Option<String>,
|
||||
pub tenant_id: Option<id_type::TenantId>,
|
||||
}
|
||||
|
||||
#[cfg(feature = "olap")]
|
||||
@ -207,7 +207,7 @@ impl SinglePurposeToken {
|
||||
origin: domain::Origin,
|
||||
settings: &Settings,
|
||||
path: Vec<TokenPurpose>,
|
||||
tenant_id: Option<String>,
|
||||
tenant_id: Option<id_type::TenantId>,
|
||||
) -> UserResult<String> {
|
||||
let exp_duration =
|
||||
std::time::Duration::from_secs(consts::SINGLE_PURPOSE_TOKEN_TIME_IN_SECS);
|
||||
@ -232,7 +232,7 @@ pub struct AuthToken {
|
||||
pub exp: u64,
|
||||
pub org_id: id_type::OrganizationId,
|
||||
pub profile_id: id_type::ProfileId,
|
||||
pub tenant_id: Option<String>,
|
||||
pub tenant_id: Option<id_type::TenantId>,
|
||||
}
|
||||
|
||||
#[cfg(feature = "olap")]
|
||||
@ -244,7 +244,7 @@ impl AuthToken {
|
||||
settings: &Settings,
|
||||
org_id: id_type::OrganizationId,
|
||||
profile_id: id_type::ProfileId,
|
||||
tenant_id: Option<String>,
|
||||
tenant_id: Option<id_type::TenantId>,
|
||||
) -> UserResult<String> {
|
||||
let exp_duration = std::time::Duration::from_secs(consts::JWT_TOKEN_TIME_IN_SECS);
|
||||
let exp = jwt::generate_exp(exp_duration)?.as_secs();
|
||||
@ -268,7 +268,7 @@ pub struct UserFromToken {
|
||||
pub role_id: String,
|
||||
pub org_id: id_type::OrganizationId,
|
||||
pub profile_id: id_type::ProfileId,
|
||||
pub tenant_id: Option<String>,
|
||||
pub tenant_id: Option<id_type::TenantId>,
|
||||
}
|
||||
|
||||
pub struct UserIdFromAuth {
|
||||
@ -282,7 +282,7 @@ pub struct SinglePurposeOrLoginToken {
|
||||
pub role_id: Option<String>,
|
||||
pub purpose: Option<TokenPurpose>,
|
||||
pub exp: u64,
|
||||
pub tenant_id: Option<String>,
|
||||
pub tenant_id: Option<id_type::TenantId>,
|
||||
}
|
||||
|
||||
pub trait AuthInfo {
|
||||
@ -1110,7 +1110,7 @@ impl<'a> HeaderMapStruct<'a> {
|
||||
self.get_mandatory_header_value_by_key(headers::X_ORGANIZATION_ID)
|
||||
.map(|val| val.to_owned())
|
||||
.and_then(|organization_id| {
|
||||
id_type::OrganizationId::wrap(organization_id).change_context(
|
||||
id_type::OrganizationId::try_from_string(organization_id).change_context(
|
||||
errors::ApiErrorResponse::InvalidRequestData {
|
||||
message: format!("`{}` header is invalid", headers::X_ORGANIZATION_ID),
|
||||
},
|
||||
|
||||
@ -112,12 +112,16 @@ pub fn check_permission(
|
||||
)
|
||||
}
|
||||
|
||||
pub fn check_tenant(token_tenant_id: Option<String>, header_tenant_id: &str) -> RouterResult<()> {
|
||||
pub fn check_tenant(
|
||||
token_tenant_id: Option<id_type::TenantId>,
|
||||
header_tenant_id: &id_type::TenantId,
|
||||
) -> RouterResult<()> {
|
||||
if let Some(tenant_id) = token_tenant_id {
|
||||
if tenant_id != header_tenant_id {
|
||||
if tenant_id != *header_tenant_id {
|
||||
return Err(ApiErrorResponse::InvalidJwtToken).attach_printable(format!(
|
||||
"Token tenant ID: '{}' does not match Header tenant ID: '{}'",
|
||||
tenant_id, header_tenant_id
|
||||
tenant_id.get_string_repr().to_owned(),
|
||||
header_tenant_id.get_string_repr().to_owned()
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
@ -1106,20 +1106,20 @@ pub struct NoLevel;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct OrganizationLevel {
|
||||
pub tenant_id: String,
|
||||
pub tenant_id: id_type::TenantId,
|
||||
pub org_id: id_type::OrganizationId,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct MerchantLevel {
|
||||
pub tenant_id: String,
|
||||
pub tenant_id: id_type::TenantId,
|
||||
pub org_id: id_type::OrganizationId,
|
||||
pub merchant_id: id_type::MerchantId,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct ProfileLevel {
|
||||
pub tenant_id: String,
|
||||
pub tenant_id: id_type::TenantId,
|
||||
pub org_id: id_type::OrganizationId,
|
||||
pub merchant_id: id_type::MerchantId,
|
||||
pub profile_id: id_type::ProfileId,
|
||||
@ -1156,7 +1156,7 @@ impl NewUserRole<NoLevel> {
|
||||
}
|
||||
|
||||
pub struct EntityInfo {
|
||||
tenant_id: String,
|
||||
tenant_id: id_type::TenantId,
|
||||
org_id: id_type::OrganizationId,
|
||||
merchant_id: Option<id_type::MerchantId>,
|
||||
profile_id: Option<id_type::ProfileId>,
|
||||
|
||||
@ -92,7 +92,7 @@ pub async fn generate_jwt_auth_token_with_attributes(
|
||||
org_id: id_type::OrganizationId,
|
||||
role_id: String,
|
||||
profile_id: id_type::ProfileId,
|
||||
tenant_id: Option<String>,
|
||||
tenant_id: Option<id_type::TenantId>,
|
||||
) -> UserResult<Secret<String>> {
|
||||
let token = AuthToken::new_token(
|
||||
user_id,
|
||||
|
||||
@ -18,7 +18,10 @@ async fn invalidate_existing_cache_success() {
|
||||
))
|
||||
.await;
|
||||
let state = Arc::new(app_state)
|
||||
.get_session_state("public", || {})
|
||||
.get_session_state(
|
||||
&common_utils::id_type::TenantId::try_from_string("public".to_string()).unwrap(),
|
||||
|| {},
|
||||
)
|
||||
.unwrap();
|
||||
let cache_key = "cacheKey".to_string();
|
||||
let cache_key_value = "val".to_string();
|
||||
|
||||
@ -218,7 +218,10 @@ async fn payments_create_success() {
|
||||
))
|
||||
.await;
|
||||
let state = Arc::new(app_state)
|
||||
.get_session_state("public", || {})
|
||||
.get_session_state(
|
||||
&id_type::TenantId::try_from_string("public".to_string()).unwrap(),
|
||||
|| {},
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
use router::connector::Aci;
|
||||
@ -265,7 +268,10 @@ async fn payments_create_failure() {
|
||||
))
|
||||
.await;
|
||||
let state = Arc::new(app_state)
|
||||
.get_session_state("public", || {})
|
||||
.get_session_state(
|
||||
&id_type::TenantId::try_from_string("public".to_string()).unwrap(),
|
||||
|| {},
|
||||
)
|
||||
.unwrap();
|
||||
let connector = utils::construct_connector_data_old(
|
||||
Box::new(Aci::new()),
|
||||
@ -328,7 +334,10 @@ async fn refund_for_successful_payments() {
|
||||
))
|
||||
.await;
|
||||
let state = Arc::new(app_state)
|
||||
.get_session_state("public", || {})
|
||||
.get_session_state(
|
||||
&id_type::TenantId::try_from_string("public".to_string()).unwrap(),
|
||||
|| {},
|
||||
)
|
||||
.unwrap();
|
||||
let connector_integration: services::BoxedPaymentConnectorIntegrationInterface<
|
||||
types::api::Authorize,
|
||||
@ -398,7 +407,10 @@ async fn refunds_create_failure() {
|
||||
))
|
||||
.await;
|
||||
let state = Arc::new(app_state)
|
||||
.get_session_state("public", || {})
|
||||
.get_session_state(
|
||||
&id_type::TenantId::try_from_string("public".to_string()).unwrap(),
|
||||
|| {},
|
||||
)
|
||||
.unwrap();
|
||||
let connector_integration: services::BoxedRefundConnectorIntegrationInterface<
|
||||
types::api::Execute,
|
||||
|
||||
@ -601,7 +601,10 @@ pub trait ConnectorActions: Connector {
|
||||
))
|
||||
.await;
|
||||
let state = Arc::new(app_state)
|
||||
.get_session_state("public", || {})
|
||||
.get_session_state(
|
||||
&common_utils::id_type::TenantId::try_from_string("public".to_string()).unwrap(),
|
||||
|| {},
|
||||
)
|
||||
.unwrap();
|
||||
let res = services::api::execute_connector_processing_step(
|
||||
&state,
|
||||
@ -641,7 +644,10 @@ pub trait ConnectorActions: Connector {
|
||||
))
|
||||
.await;
|
||||
let state = Arc::new(app_state)
|
||||
.get_session_state("public", || {})
|
||||
.get_session_state(
|
||||
&common_utils::id_type::TenantId::try_from_string("public".to_string()).unwrap(),
|
||||
|| {},
|
||||
)
|
||||
.unwrap();
|
||||
let res = services::api::execute_connector_processing_step(
|
||||
&state,
|
||||
@ -682,7 +688,10 @@ pub trait ConnectorActions: Connector {
|
||||
))
|
||||
.await;
|
||||
let state = Arc::new(app_state)
|
||||
.get_session_state("public", || {})
|
||||
.get_session_state(
|
||||
&common_utils::id_type::TenantId::try_from_string("public".to_string()).unwrap(),
|
||||
|| {},
|
||||
)
|
||||
.unwrap();
|
||||
let res = services::api::execute_connector_processing_step(
|
||||
&state,
|
||||
@ -722,7 +731,10 @@ pub trait ConnectorActions: Connector {
|
||||
))
|
||||
.await;
|
||||
let state = Arc::new(app_state)
|
||||
.get_session_state("public", || {})
|
||||
.get_session_state(
|
||||
&common_utils::id_type::TenantId::try_from_string("public".to_string()).unwrap(),
|
||||
|| {},
|
||||
)
|
||||
.unwrap();
|
||||
let res = services::api::execute_connector_processing_step(
|
||||
&state,
|
||||
@ -813,7 +825,10 @@ pub trait ConnectorActions: Connector {
|
||||
))
|
||||
.await;
|
||||
let state = Arc::new(app_state)
|
||||
.get_session_state("public", || {})
|
||||
.get_session_state(
|
||||
&common_utils::id_type::TenantId::try_from_string("public".to_string()).unwrap(),
|
||||
|| {},
|
||||
)
|
||||
.unwrap();
|
||||
let res = services::api::execute_connector_processing_step(
|
||||
&state,
|
||||
@ -850,7 +865,10 @@ async fn call_connector<
|
||||
))
|
||||
.await;
|
||||
let state = Arc::new(app_state)
|
||||
.get_session_state("public", || {})
|
||||
.get_session_state(
|
||||
&common_utils::id_type::TenantId::try_from_string("public".to_string()).unwrap(),
|
||||
|| {},
|
||||
)
|
||||
.unwrap();
|
||||
services::api::execute_connector_processing_step(
|
||||
&state,
|
||||
|
||||
@ -295,7 +295,10 @@ async fn payments_create_core() {
|
||||
let merchant_id = id_type::MerchantId::try_from(Cow::from("juspay_merchant")).unwrap();
|
||||
|
||||
let state = Arc::new(app_state)
|
||||
.get_session_state("public", || {})
|
||||
.get_session_state(
|
||||
&id_type::TenantId::try_from_string("public".to_string()).unwrap(),
|
||||
|| {},
|
||||
)
|
||||
.unwrap();
|
||||
let key_manager_state = &(&state).into();
|
||||
let key_store = state
|
||||
@ -552,7 +555,10 @@ async fn payments_create_core_adyen_no_redirect() {
|
||||
))
|
||||
.await;
|
||||
let state = Arc::new(app_state)
|
||||
.get_session_state("public", || {})
|
||||
.get_session_state(
|
||||
&id_type::TenantId::try_from_string("public".to_string()).unwrap(),
|
||||
|| {},
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let payment_id =
|
||||
|
||||
@ -56,7 +56,10 @@ async fn payments_create_core() {
|
||||
let merchant_id = id_type::MerchantId::try_from(Cow::from("juspay_merchant")).unwrap();
|
||||
|
||||
let state = Arc::new(app_state)
|
||||
.get_session_state("public", || {})
|
||||
.get_session_state(
|
||||
&id_type::TenantId::try_from_string("public".to_string()).unwrap(),
|
||||
|| {},
|
||||
)
|
||||
.unwrap();
|
||||
let key_manager_state = &(&state).into();
|
||||
let key_store = state
|
||||
@ -321,7 +324,10 @@ async fn payments_create_core_adyen_no_redirect() {
|
||||
))
|
||||
.await;
|
||||
let state = Arc::new(app_state)
|
||||
.get_session_state("public", || {})
|
||||
.get_session_state(
|
||||
&id_type::TenantId::try_from_string("public".to_string()).unwrap(),
|
||||
|| {},
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let customer_id = format!("cust_{}", Uuid::new_v4());
|
||||
|
||||
@ -18,7 +18,10 @@ async fn get_redis_conn_failure() {
|
||||
))
|
||||
.await;
|
||||
let state = Arc::new(app_state)
|
||||
.get_session_state("public", || {})
|
||||
.get_session_state(
|
||||
&common_utils::id_type::TenantId::try_from_string("public".to_string()).unwrap(),
|
||||
|| {},
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let _ = state.store.get_redis_conn().map(|conn| {
|
||||
@ -46,7 +49,10 @@ async fn get_redis_conn_success() {
|
||||
))
|
||||
.await;
|
||||
let state = Arc::new(app_state)
|
||||
.get_session_state("public", || {})
|
||||
.get_session_state(
|
||||
&common_utils::id_type::TenantId::try_from_string("public".to_string()).unwrap(),
|
||||
|| {},
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
// Act
|
||||
|
||||
@ -7,7 +7,7 @@ use std::{
|
||||
pub mod types;
|
||||
pub mod workflows;
|
||||
|
||||
use common_utils::{errors::CustomResult, signals::get_allowed_signals};
|
||||
use common_utils::{errors::CustomResult, id_type, signals::get_allowed_signals};
|
||||
use diesel_models::enums;
|
||||
pub use diesel_models::{self, process_tracker as storage};
|
||||
use error_stack::ResultExt;
|
||||
@ -42,7 +42,7 @@ pub async fn start_consumer<T: SchedulerAppState + 'static, U: SchedulerSessionS
|
||||
app_state_to_session_state: F,
|
||||
) -> CustomResult<(), errors::ProcessTrackerError>
|
||||
where
|
||||
F: Fn(&T, &str) -> CustomResult<U, errors::ProcessTrackerError>,
|
||||
F: Fn(&T, &id_type::TenantId) -> CustomResult<U, errors::ProcessTrackerError>,
|
||||
{
|
||||
use std::time::Duration;
|
||||
|
||||
@ -88,7 +88,7 @@ where
|
||||
let start_time = std_time::Instant::now();
|
||||
let tenants = state.get_tenants();
|
||||
for tenant in tenants {
|
||||
let session_state = app_state_to_session_state(state, tenant.as_str())?;
|
||||
let session_state = app_state_to_session_state(state, &tenant)?;
|
||||
pt_utils::consumer_operation_handler(
|
||||
session_state.clone(),
|
||||
settings.clone(),
|
||||
|
||||
@ -1,6 +1,6 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_utils::errors::CustomResult;
|
||||
use common_utils::{errors::CustomResult, id_type};
|
||||
use diesel_models::enums::ProcessTrackerStatus;
|
||||
use error_stack::{report, ResultExt};
|
||||
use router_env::{
|
||||
@ -27,7 +27,7 @@ pub async fn start_producer<T, U, F>(
|
||||
app_state_to_session_state: F,
|
||||
) -> CustomResult<(), errors::ProcessTrackerError>
|
||||
where
|
||||
F: Fn(&T, &str) -> CustomResult<U, errors::ProcessTrackerError>,
|
||||
F: Fn(&T, &id_type::TenantId) -> CustomResult<U, errors::ProcessTrackerError>,
|
||||
T: SchedulerAppState,
|
||||
U: SchedulerSessionState,
|
||||
{
|
||||
@ -69,7 +69,7 @@ where
|
||||
interval.tick().await;
|
||||
let tenants = state.get_tenants();
|
||||
for tenant in tenants {
|
||||
let session_state = app_state_to_session_state(state, tenant.as_str())?;
|
||||
let session_state = app_state_to_session_state(state, &tenant)?;
|
||||
match run_producer_flow(&session_state, &scheduler_settings).await {
|
||||
Ok(_) => (),
|
||||
Err(error) => {
|
||||
|
||||
@ -1,6 +1,6 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_utils::errors::CustomResult;
|
||||
use common_utils::{errors::CustomResult, id_type};
|
||||
use storage_impl::mock_db::MockDb;
|
||||
#[cfg(feature = "kv_store")]
|
||||
use storage_impl::KVRouterStore;
|
||||
@ -52,7 +52,7 @@ impl SchedulerInterface for MockDb {}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
pub trait SchedulerAppState: Send + Sync + Clone {
|
||||
fn get_tenants(&self) -> Vec<String>;
|
||||
fn get_tenants(&self) -> Vec<id_type::TenantId>;
|
||||
}
|
||||
#[async_trait::async_trait]
|
||||
pub trait SchedulerSessionState: Send + Sync + Clone {
|
||||
@ -71,7 +71,7 @@ pub async fn start_process_tracker<
|
||||
app_state_to_session_state: F,
|
||||
) -> CustomResult<(), errors::ProcessTrackerError>
|
||||
where
|
||||
F: Fn(&T, &str) -> CustomResult<U, errors::ProcessTrackerError>,
|
||||
F: Fn(&T, &id_type::TenantId) -> CustomResult<U, errors::ProcessTrackerError>,
|
||||
{
|
||||
match scheduler_flow {
|
||||
SchedulerFlow::Producer => {
|
||||
|
||||
Reference in New Issue
Block a user