fix(users): use global Redis prefix for all user-related cache operations (#7932)

This commit is contained in:
Sandeep Kumar
2025-05-05 21:00:33 +05:30
committed by GitHub
parent afeef302ca
commit 6750b10f7a
8 changed files with 68 additions and 59 deletions

View File

@ -155,8 +155,10 @@ pub trait GlobalStorageInterface:
+ user_role::UserRoleInterface
+ user_key_store::UserKeyStoreInterface
+ role::RoleInterface
+ RedisConnInterface
+ 'static
{
fn get_cache_store(&self) -> Box<(dyn RedisConnInterface + Send + Sync + 'static)>;
}
#[async_trait::async_trait]
@ -217,7 +219,11 @@ impl StorageInterface for Store {
}
#[async_trait::async_trait]
impl GlobalStorageInterface for Store {}
impl GlobalStorageInterface for Store {
fn get_cache_store(&self) -> Box<(dyn RedisConnInterface + Send + Sync + 'static)> {
Box::new(self.clone())
}
}
impl AccountsStorageInterface for Store {}
@ -233,7 +239,11 @@ impl StorageInterface for MockDb {
}
#[async_trait::async_trait]
impl GlobalStorageInterface for MockDb {}
impl GlobalStorageInterface for MockDb {
fn get_cache_store(&self) -> Box<(dyn RedisConnInterface + Send + Sync + 'static)> {
Box::new(self.clone())
}
}
impl AccountsStorageInterface for MockDb {}

View File

@ -3269,7 +3269,11 @@ impl StorageInterface for KafkaStore {
}
}
impl GlobalStorageInterface for KafkaStore {}
impl GlobalStorageInterface for KafkaStore {
fn get_cache_store(&self) -> Box<(dyn RedisConnInterface + Send + Sync + 'static)> {
Box::new(self.clone())
}
}
impl AccountsStorageInterface for KafkaStore {}
impl PaymentMethodsStorageInterface for KafkaStore {}

View File

@ -153,6 +153,7 @@ pub trait SessionStateInfo {
#[cfg(feature = "partial-auth")]
fn get_detached_auth(&self) -> RouterResult<(Blake3, &[u8])>;
fn session_state(&self) -> SessionState;
fn global_store(&self) -> Box<dyn GlobalStorageInterface>;
}
impl SessionStateInfo for SessionState {
@ -209,6 +210,9 @@ impl SessionStateInfo for SessionState {
fn session_state(&self) -> SessionState {
self.clone()
}
fn global_store(&self) -> Box<(dyn GlobalStorageInterface)> {
self.global_store.to_owned()
}
}
#[derive(Clone)]
pub struct AppState {

View File

@ -27,7 +27,8 @@ pub async fn insert_user_in_blacklist(state: &SessionState, user_id: &str) -> Us
let user_blacklist_key = format!("{}{}", USER_BLACKLIST_PREFIX, user_id);
let expiry =
expiry_to_i64(JWT_TOKEN_TIME_IN_SECS).change_context(UserErrors::InternalServerError)?;
let redis_conn = get_redis_connection(state).change_context(UserErrors::InternalServerError)?;
let redis_conn = get_redis_connection_for_global_tenant(state)
.change_context(UserErrors::InternalServerError)?;
redis_conn
.set_key_with_expiry(
&user_blacklist_key.as_str().into(),
@ -43,7 +44,8 @@ pub async fn insert_role_in_blacklist(state: &SessionState, role_id: &str) -> Us
let role_blacklist_key = format!("{}{}", ROLE_BLACKLIST_PREFIX, role_id);
let expiry =
expiry_to_i64(JWT_TOKEN_TIME_IN_SECS).change_context(UserErrors::InternalServerError)?;
let redis_conn = get_redis_connection(state).change_context(UserErrors::InternalServerError)?;
let redis_conn = get_redis_connection_for_global_tenant(state)
.change_context(UserErrors::InternalServerError)?;
redis_conn
.set_key_with_expiry(
&role_blacklist_key.as_str().into(),
@ -59,7 +61,7 @@ pub async fn insert_role_in_blacklist(state: &SessionState, role_id: &str) -> Us
#[cfg(feature = "olap")]
async fn invalidate_role_cache(state: &SessionState, role_id: &str) -> RouterResult<()> {
let redis_conn = get_redis_connection(state)?;
let redis_conn = get_redis_connection_for_global_tenant(state)?;
redis_conn
.delete_key(&authz::get_cache_key_from_role_id(role_id).as_str().into())
.await
@ -74,7 +76,7 @@ pub async fn check_user_in_blacklist<A: SessionStateInfo>(
) -> RouterResult<bool> {
let token = format!("{}{}", USER_BLACKLIST_PREFIX, user_id);
let token_issued_at = expiry_to_i64(token_expiry - JWT_TOKEN_TIME_IN_SECS)?;
let redis_conn = get_redis_connection(state)?;
let redis_conn = get_redis_connection_for_global_tenant(state)?;
redis_conn
.get_key::<Option<i64>>(&token.as_str().into())
.await
@ -89,7 +91,7 @@ pub async fn check_role_in_blacklist<A: SessionStateInfo>(
) -> RouterResult<bool> {
let token = format!("{}{}", ROLE_BLACKLIST_PREFIX, role_id);
let token_issued_at = expiry_to_i64(token_expiry - JWT_TOKEN_TIME_IN_SECS)?;
let redis_conn = get_redis_connection(state)?;
let redis_conn = get_redis_connection_for_global_tenant(state)?;
redis_conn
.get_key::<Option<i64>>(&token.as_str().into())
.await
@ -99,7 +101,8 @@ pub async fn check_role_in_blacklist<A: SessionStateInfo>(
#[cfg(feature = "email")]
pub async fn insert_email_token_in_blacklist(state: &SessionState, token: &str) -> UserResult<()> {
let redis_conn = get_redis_connection(state).change_context(UserErrors::InternalServerError)?;
let redis_conn = get_redis_connection_for_global_tenant(state)
.change_context(UserErrors::InternalServerError)?;
let blacklist_key = format!("{}{token}", EMAIL_TOKEN_BLACKLIST_PREFIX);
let expiry =
expiry_to_i64(EMAIL_TOKEN_TIME_IN_SECS).change_context(UserErrors::InternalServerError)?;
@ -111,7 +114,8 @@ pub async fn insert_email_token_in_blacklist(state: &SessionState, token: &str)
#[cfg(feature = "email")]
pub async fn check_email_token_in_blacklist(state: &SessionState, token: &str) -> UserResult<()> {
let redis_conn = get_redis_connection(state).change_context(UserErrors::InternalServerError)?;
let redis_conn = get_redis_connection_for_global_tenant(state)
.change_context(UserErrors::InternalServerError)?;
let blacklist_key = format!("{}{token}", EMAIL_TOKEN_BLACKLIST_PREFIX);
let key_exists = redis_conn
.exists::<()>(&blacklist_key.as_str().into())
@ -124,9 +128,11 @@ pub async fn check_email_token_in_blacklist(state: &SessionState, token: &str) -
Ok(())
}
fn get_redis_connection<A: SessionStateInfo>(state: &A) -> RouterResult<Arc<RedisConnectionPool>> {
fn get_redis_connection_for_global_tenant<A: SessionStateInfo>(
state: &A,
) -> RouterResult<Arc<RedisConnectionPool>> {
state
.store()
.global_store()
.get_redis_conn()
.change_context(ApiErrorResponse::InternalServerError)
.attach_printable("Failed to get redis connection")

View File

@ -61,7 +61,7 @@ async fn get_role_info_from_cache<A>(state: &A, role_id: &str) -> RouterResult<r
where
A: SessionStateInfo + Sync,
{
let redis_conn = get_redis_connection(state)?;
let redis_conn = get_redis_connection_for_global_tenant(state)?;
redis_conn
.get_and_deserialize_key(&get_cache_key_from_role_id(role_id).into(), "RoleInfo")
@ -100,7 +100,7 @@ pub async fn set_role_info_in_cache<A>(
where
A: SessionStateInfo + Sync,
{
let redis_conn = get_redis_connection(state)?;
let redis_conn = get_redis_connection_for_global_tenant(state)?;
redis_conn
.serialize_and_set_key_with_expiry(
@ -143,9 +143,11 @@ pub fn check_tenant(
Ok(())
}
fn get_redis_connection<A: SessionStateInfo>(state: &A) -> RouterResult<Arc<RedisConnectionPool>> {
fn get_redis_connection_for_global_tenant<A: SessionStateInfo>(
state: &A,
) -> RouterResult<Arc<RedisConnectionPool>> {
state
.store()
.global_store()
.get_redis_conn()
.change_context(ApiErrorResponse::InternalServerError)
.attach_printable("Failed to get redis connection")

View File

@ -34,7 +34,7 @@ pub async fn get_authorization_url(
// Save csrf & nonce as key value respectively
let key = get_oidc_redis_key(csrf_token.secret());
get_redis_connection(&state)?
get_redis_connection_for_global_tenant(&state)?
.set_key_with_expiry(&key.into(), nonce.secret(), consts::user::REDIS_SSO_TTL)
.await
.change_context(UserErrors::InternalServerError)
@ -138,7 +138,7 @@ async fn get_nonce_from_redis(
state: &SessionState,
redirect_state: &Secret<String>,
) -> UserResult<oidc::Nonce> {
let redis_connection = get_redis_connection(state)?;
let redis_connection = get_redis_connection_for_global_tenant(state)?;
let redirect_state = redirect_state.clone().expose();
let key = get_oidc_redis_key(&redirect_state);
redis_connection
@ -188,9 +188,11 @@ fn get_oidc_redis_key(csrf: &str) -> String {
format!("{}OIDC_{}", consts::user::REDIS_SSO_PREFIX, csrf)
}
fn get_redis_connection(state: &SessionState) -> UserResult<std::sync::Arc<RedisConnectionPool>> {
fn get_redis_connection_for_global_tenant(
state: &SessionState,
) -> UserResult<std::sync::Arc<RedisConnectionPool>> {
state
.store
.global_store
.get_redis_conn()
.change_context(UserErrors::InternalServerError)
.attach_printable("Failed to get redis connection")

View File

@ -135,33 +135,14 @@ pub async fn get_user_from_db_by_email(
.map(UserFromStorage::from)
}
pub fn get_redis_connection(state: &SessionState) -> UserResult<Arc<RedisConnectionPool>> {
state
.store
.get_redis_conn()
.change_context(UserErrors::InternalServerError)
.attach_printable("Failed to get redis connection")
}
pub fn get_redis_connection_for_global_tenant(
state: &SessionState,
) -> UserResult<Arc<RedisConnectionPool>> {
let redis_connection_pool = state
.store
state
.global_store
.get_redis_conn()
.change_context(UserErrors::InternalServerError)
.attach_printable("Failed to get redis connection")?;
let global_tenant_prefix = &state.conf.multitenancy.global_tenant.redis_key_prefix;
Ok(Arc::new(RedisConnectionPool {
pool: Arc::clone(&redis_connection_pool.pool),
key_prefix: global_tenant_prefix.to_string(),
config: Arc::clone(&redis_connection_pool.config),
subscriber: Arc::clone(&redis_connection_pool.subscriber),
publisher: Arc::clone(&redis_connection_pool.publisher),
is_redis_available: Arc::clone(&redis_connection_pool.is_redis_available),
}))
.attach_printable("Failed to get redis connection")
}
impl ForeignFrom<&user_api::AuthConfig> for UserAuthType {
@ -267,7 +248,7 @@ pub async fn set_sso_id_in_redis(
oidc_state: Secret<String>,
sso_id: String,
) -> UserResult<()> {
let connection = get_redis_connection(state)?;
let connection = get_redis_connection_for_global_tenant(state)?;
let key = get_oidc_key(&oidc_state.expose());
connection
.set_key_with_expiry(&key.into(), sso_id, REDIS_SSO_TTL)
@ -280,7 +261,7 @@ pub async fn get_sso_id_from_redis(
state: &SessionState,
oidc_state: Secret<String>,
) -> UserResult<String> {
let connection = get_redis_connection(state)?;
let connection = get_redis_connection_for_global_tenant(state)?;
let key = get_oidc_key(&oidc_state.expose());
connection
.get_key::<Option<String>>(&key.into())

View File

@ -33,7 +33,7 @@ pub fn generate_default_totp(
}
pub async fn check_totp_in_redis(state: &SessionState, user_id: &str) -> UserResult<bool> {
let redis_conn = super::get_redis_connection(state)?;
let redis_conn = super::get_redis_connection_for_global_tenant(state)?;
let key = format!("{}{}", consts::user::REDIS_TOTP_PREFIX, user_id);
redis_conn
.exists::<()>(&key.into())
@ -42,7 +42,7 @@ pub async fn check_totp_in_redis(state: &SessionState, user_id: &str) -> UserRes
}
pub async fn check_recovery_code_in_redis(state: &SessionState, user_id: &str) -> UserResult<bool> {
let redis_conn = super::get_redis_connection(state)?;
let redis_conn = super::get_redis_connection_for_global_tenant(state)?;
let key = format!("{}{}", consts::user::REDIS_RECOVERY_CODE_PREFIX, user_id);
redis_conn
.exists::<()>(&key.into())
@ -51,7 +51,7 @@ pub async fn check_recovery_code_in_redis(state: &SessionState, user_id: &str) -
}
pub async fn insert_totp_in_redis(state: &SessionState, user_id: &str) -> UserResult<()> {
let redis_conn = super::get_redis_connection(state)?;
let redis_conn = super::get_redis_connection_for_global_tenant(state)?;
let key = format!("{}{}", consts::user::REDIS_TOTP_PREFIX, user_id);
redis_conn
.set_key_with_expiry(
@ -68,7 +68,7 @@ pub async fn insert_totp_secret_in_redis(
user_id: &str,
secret: &masking::Secret<String>,
) -> UserResult<()> {
let redis_conn = super::get_redis_connection(state)?;
let redis_conn = super::get_redis_connection_for_global_tenant(state)?;
redis_conn
.set_key_with_expiry(
&get_totp_secret_key(user_id).into(),
@ -83,7 +83,7 @@ pub async fn get_totp_secret_from_redis(
state: &SessionState,
user_id: &str,
) -> UserResult<Option<masking::Secret<String>>> {
let redis_conn = super::get_redis_connection(state)?;
let redis_conn = super::get_redis_connection_for_global_tenant(state)?;
redis_conn
.get_key::<Option<String>>(&get_totp_secret_key(user_id).into())
.await
@ -92,7 +92,7 @@ pub async fn get_totp_secret_from_redis(
}
pub async fn delete_totp_secret_from_redis(state: &SessionState, user_id: &str) -> UserResult<()> {
let redis_conn = super::get_redis_connection(state)?;
let redis_conn = super::get_redis_connection_for_global_tenant(state)?;
redis_conn
.delete_key(&get_totp_secret_key(user_id).into())
.await
@ -105,7 +105,7 @@ fn get_totp_secret_key(user_id: &str) -> String {
}
pub async fn insert_recovery_code_in_redis(state: &SessionState, user_id: &str) -> UserResult<()> {
let redis_conn = super::get_redis_connection(state)?;
let redis_conn = super::get_redis_connection_for_global_tenant(state)?;
let key = format!("{}{}", consts::user::REDIS_RECOVERY_CODE_PREFIX, user_id);
redis_conn
.set_key_with_expiry(
@ -118,7 +118,7 @@ pub async fn insert_recovery_code_in_redis(state: &SessionState, user_id: &str)
}
pub async fn delete_totp_from_redis(state: &SessionState, user_id: &str) -> UserResult<()> {
let redis_conn = super::get_redis_connection(state)?;
let redis_conn = super::get_redis_connection_for_global_tenant(state)?;
let key = format!("{}{}", consts::user::REDIS_TOTP_PREFIX, user_id);
redis_conn
.delete_key(&key.into())
@ -131,7 +131,7 @@ pub async fn delete_recovery_code_from_redis(
state: &SessionState,
user_id: &str,
) -> UserResult<()> {
let redis_conn = super::get_redis_connection(state)?;
let redis_conn = super::get_redis_connection_for_global_tenant(state)?;
let key = format!("{}{}", consts::user::REDIS_RECOVERY_CODE_PREFIX, user_id);
redis_conn
.delete_key(&key.into())
@ -156,7 +156,7 @@ pub async fn insert_totp_attempts_in_redis(
user_id: &str,
user_totp_attempts: u8,
) -> UserResult<()> {
let redis_conn = super::get_redis_connection(state)?;
let redis_conn = super::get_redis_connection_for_global_tenant(state)?;
redis_conn
.set_key_with_expiry(
&get_totp_attempts_key(user_id).into(),
@ -167,7 +167,7 @@ pub async fn insert_totp_attempts_in_redis(
.change_context(UserErrors::InternalServerError)
}
pub async fn get_totp_attempts_from_redis(state: &SessionState, user_id: &str) -> UserResult<u8> {
let redis_conn = super::get_redis_connection(state)?;
let redis_conn = super::get_redis_connection_for_global_tenant(state)?;
redis_conn
.get_key::<Option<u8>>(&get_totp_attempts_key(user_id).into())
.await
@ -180,7 +180,7 @@ pub async fn insert_recovery_code_attempts_in_redis(
user_id: &str,
user_recovery_code_attempts: u8,
) -> UserResult<()> {
let redis_conn = super::get_redis_connection(state)?;
let redis_conn = super::get_redis_connection_for_global_tenant(state)?;
redis_conn
.set_key_with_expiry(
&get_recovery_code_attempts_key(user_id).into(),
@ -195,7 +195,7 @@ pub async fn get_recovery_code_attempts_from_redis(
state: &SessionState,
user_id: &str,
) -> UserResult<u8> {
let redis_conn = super::get_redis_connection(state)?;
let redis_conn = super::get_redis_connection_for_global_tenant(state)?;
redis_conn
.get_key::<Option<u8>>(&get_recovery_code_attempts_key(user_id).into())
.await
@ -207,7 +207,7 @@ pub async fn delete_totp_attempts_from_redis(
state: &SessionState,
user_id: &str,
) -> UserResult<()> {
let redis_conn = super::get_redis_connection(state)?;
let redis_conn = super::get_redis_connection_for_global_tenant(state)?;
redis_conn
.delete_key(&get_totp_attempts_key(user_id).into())
.await
@ -219,7 +219,7 @@ pub async fn delete_recovery_code_attempts_from_redis(
state: &SessionState,
user_id: &str,
) -> UserResult<()> {
let redis_conn = super::get_redis_connection(state)?;
let redis_conn = super::get_redis_connection_for_global_tenant(state)?;
redis_conn
.delete_key(&get_recovery_code_attempts_key(user_id).into())
.await