fix: cache on multitenancy (#5561)

This commit is contained in:
Kartikeya Hegde
2024-08-09 13:29:53 +05:30
committed by GitHub
parent 920243e1d4
commit 74632aebea
3 changed files with 71 additions and 81 deletions

View File

@ -27,6 +27,12 @@ impl RedisValue {
pub fn into_inner(self) -> FredRedisValue { pub fn into_inner(self) -> FredRedisValue {
self.inner self.inner
} }
pub fn from_bytes(val: Vec<u8>) -> Self {
Self {
inner: FredRedisValue::Bytes(val.into()),
}
}
pub fn from_string(value: String) -> Self { pub fn from_string(value: String) -> Self {
Self { Self {
inner: FredRedisValue::String(value.into()), inner: FredRedisValue::String(value.into()),
@ -34,6 +40,12 @@ impl RedisValue {
} }
} }
impl From<RedisValue> for FredRedisValue {
fn from(v: RedisValue) -> Self {
v.inner
}
}
#[derive(Debug, serde::Deserialize, Clone)] #[derive(Debug, serde::Deserialize, Clone)]
#[serde(default)] #[serde(default)]
pub struct RedisSettings { pub struct RedisSettings {

View File

@ -2,7 +2,7 @@ use std::{any::Any, borrow::Cow, fmt::Debug, sync::Arc};
use common_utils::{ use common_utils::{
errors::{self, CustomResult}, errors::{self, CustomResult},
ext_traits::AsyncExt, ext_traits::{AsyncExt, ByteSliceExt},
}; };
use dyn_clone::DynClone; use dyn_clone::DynClone;
use error_stack::{Report, ResultExt}; use error_stack::{Report, ResultExt};
@ -23,30 +23,6 @@ use crate::{
/// Redis channel name used for publishing invalidation messages /// Redis channel name used for publishing invalidation messages
pub const IMC_INVALIDATION_CHANNEL: &str = "hyperswitch_invalidate"; pub const IMC_INVALIDATION_CHANNEL: &str = "hyperswitch_invalidate";
/// Prefix for config cache key
const CONFIG_CACHE_PREFIX: &str = "config";
/// Prefix for accounts cache key
const ACCOUNTS_CACHE_PREFIX: &str = "accounts";
/// Prefix for routing cache key
const ROUTING_CACHE_PREFIX: &str = "routing";
/// Prefix for three ds decision manager cache key
const DECISION_MANAGER_CACHE_PREFIX: &str = "decision_manager";
/// Prefix for surcharge cache key
const SURCHARGE_CACHE_PREFIX: &str = "surcharge";
/// Prefix for cgraph cache key
const CGRAPH_CACHE_PREFIX: &str = "cgraph";
/// Prefix for all kinds of cache key
const ALL_CACHE_PREFIX: &str = "all_cache_kind";
/// Prefix for PM Filter cgraph cache key
const PM_FILTERS_CGRAPH_CACHE_PREFIX: &str = "pm_filters_cgraph";
/// Time to live 30 mins /// Time to live 30 mins
const CACHE_TTL: u64 = 30 * 60; const CACHE_TTL: u64 = 30 * 60;
@ -101,6 +77,13 @@ pub trait Cacheable: Any + Send + Sync + DynClone {
fn as_any(&self) -> &dyn Any; fn as_any(&self) -> &dyn Any;
} }
#[derive(serde::Serialize, serde::Deserialize)]
pub struct CacheRedact<'a> {
pub tenant: String,
pub kind: CacheKind<'a>,
}
#[derive(serde::Serialize, serde::Deserialize)]
pub enum CacheKind<'a> { pub enum CacheKind<'a> {
Config(Cow<'a, str>), Config(Cow<'a, str>),
Accounts(Cow<'a, str>), Accounts(Cow<'a, str>),
@ -112,45 +95,30 @@ pub enum CacheKind<'a> {
All(Cow<'a, str>), All(Cow<'a, str>),
} }
impl<'a> From<CacheKind<'a>> for RedisValue { impl<'a> TryFrom<CacheRedact<'a>> for RedisValue {
fn from(kind: CacheKind<'a>) -> Self { type Error = Report<errors::ValidationError>;
let value = match kind { fn try_from(v: CacheRedact<'a>) -> Result<Self, Self::Error> {
CacheKind::Config(s) => format!("{CONFIG_CACHE_PREFIX},{s}"), Ok(Self::from_bytes(serde_json::to_vec(&v).change_context(
CacheKind::Accounts(s) => format!("{ACCOUNTS_CACHE_PREFIX},{s}"), errors::ValidationError::InvalidValue {
CacheKind::Routing(s) => format!("{ROUTING_CACHE_PREFIX},{s}"), message: "Invalid publish key provided in pubsub".into(),
CacheKind::DecisionManager(s) => format!("{DECISION_MANAGER_CACHE_PREFIX},{s}"), },
CacheKind::Surcharge(s) => format!("{SURCHARGE_CACHE_PREFIX},{s}"), )?))
CacheKind::CGraph(s) => format!("{CGRAPH_CACHE_PREFIX},{s}"),
CacheKind::PmFiltersCGraph(s) => format!("{PM_FILTERS_CGRAPH_CACHE_PREFIX},{s}"),
CacheKind::All(s) => format!("{ALL_CACHE_PREFIX},{s}"),
};
Self::from_string(value)
} }
} }
impl<'a> TryFrom<RedisValue> for CacheKind<'a> { impl<'a> TryFrom<RedisValue> for CacheRedact<'a> {
type Error = Report<errors::ValidationError>; type Error = Report<errors::ValidationError>;
fn try_from(kind: RedisValue) -> Result<Self, Self::Error> {
let validation_err = errors::ValidationError::InvalidValue { fn try_from(v: RedisValue) -> Result<Self, Self::Error> {
message: "Invalid publish key provided in pubsub".into(), let bytes = v.as_bytes().ok_or(errors::ValidationError::InvalidValue {
}; message: "InvalidValue received in pubsub".to_string(),
let kind = kind.as_string().ok_or(validation_err.clone())?; })?;
let split = kind.split_once(',').ok_or(validation_err.clone())?;
match split.0 { bytes
ACCOUNTS_CACHE_PREFIX => Ok(Self::Accounts(Cow::Owned(split.1.to_string()))), .parse_struct("CacheRedact")
CONFIG_CACHE_PREFIX => Ok(Self::Config(Cow::Owned(split.1.to_string()))), .change_context(errors::ValidationError::InvalidValue {
ROUTING_CACHE_PREFIX => Ok(Self::Routing(Cow::Owned(split.1.to_string()))), message: "Unable to deserialize the value from pubsub".to_string(),
DECISION_MANAGER_CACHE_PREFIX => { })
Ok(Self::DecisionManager(Cow::Owned(split.1.to_string())))
}
SURCHARGE_CACHE_PREFIX => Ok(Self::Surcharge(Cow::Owned(split.1.to_string()))),
CGRAPH_CACHE_PREFIX => Ok(Self::CGraph(Cow::Owned(split.1.to_string()))),
PM_FILTERS_CGRAPH_CACHE_PREFIX => {
Ok(Self::PmFiltersCGraph(Cow::Owned(split.1.to_string())))
}
ALL_CACHE_PREFIX => Ok(Self::All(Cow::Owned(split.1.to_string()))),
_ => Err(validation_err.into()),
}
} }
} }

View File

@ -5,8 +5,8 @@ use redis_interface::{errors as redis_errors, PubsubInterface, RedisValue};
use router_env::{logger, tracing::Instrument}; use router_env::{logger, tracing::Instrument};
use crate::redis::cache::{ use crate::redis::cache::{
CacheKey, CacheKind, ACCOUNTS_CACHE, CGRAPH_CACHE, CONFIG_CACHE, DECISION_MANAGER_CACHE, CacheKey, CacheKind, CacheRedact, ACCOUNTS_CACHE, CGRAPH_CACHE, CONFIG_CACHE,
PM_FILTERS_CGRAPH_CACHE, ROUTING_CACHE, SURCHARGE_CACHE, DECISION_MANAGER_CACHE, PM_FILTERS_CGRAPH_CACHE, ROUTING_CACHE, SURCHARGE_CACHE,
}; };
#[async_trait::async_trait] #[async_trait::async_trait]
@ -66,15 +66,23 @@ impl PubSubInterface for std::sync::Arc<redis_interface::RedisConnectionPool> {
channel: &str, channel: &str,
key: CacheKind<'a>, key: CacheKind<'a>,
) -> error_stack::Result<usize, redis_errors::RedisError> { ) -> error_stack::Result<usize, redis_errors::RedisError> {
let key = CacheRedact {
kind: key,
tenant: self.key_prefix.clone(),
};
self.publisher self.publisher
.publish(channel, RedisValue::from(key).into_inner()) .publish(
channel,
RedisValue::try_from(key).change_context(redis_errors::RedisError::PublishError)?,
)
.await .await
.change_context(redis_errors::RedisError::SubscribeError) .change_context(redis_errors::RedisError::SubscribeError)
} }
#[inline] #[inline]
async fn on_message(&self) -> error_stack::Result<(), redis_errors::RedisError> { async fn on_message(&self) -> error_stack::Result<(), redis_errors::RedisError> {
logger::debug!("Started on message: {:?}", self.key_prefix); logger::debug!("Started on message");
let mut rx = self.subscriber.on_message(); let mut rx = self.subscriber.on_message();
while let Ok(message) = rx.recv().await { while let Ok(message) = rx.recv().await {
let channel_name = message.channel.to_string(); let channel_name = message.channel.to_string();
@ -82,7 +90,7 @@ impl PubSubInterface for std::sync::Arc<redis_interface::RedisConnectionPool> {
match channel_name.as_str() { match channel_name.as_str() {
super::cache::IMC_INVALIDATION_CHANNEL => { super::cache::IMC_INVALIDATION_CHANNEL => {
let key = match CacheKind::try_from(RedisValue::new(message.value)) let message = match CacheRedact::try_from(RedisValue::new(message.value))
.change_context(redis_errors::RedisError::OnMessageError) .change_context(redis_errors::RedisError::OnMessageError)
{ {
Ok(value) => value, Ok(value) => value,
@ -92,12 +100,12 @@ impl PubSubInterface for std::sync::Arc<redis_interface::RedisConnectionPool> {
} }
}; };
let key = match key { let key = match message.kind {
CacheKind::Config(key) => { CacheKind::Config(key) => {
CONFIG_CACHE CONFIG_CACHE
.remove(CacheKey { .remove(CacheKey {
key: key.to_string(), key: key.to_string(),
prefix: self.key_prefix.clone(), prefix: message.tenant.clone(),
}) })
.await; .await;
key key
@ -106,7 +114,7 @@ impl PubSubInterface for std::sync::Arc<redis_interface::RedisConnectionPool> {
ACCOUNTS_CACHE ACCOUNTS_CACHE
.remove(CacheKey { .remove(CacheKey {
key: key.to_string(), key: key.to_string(),
prefix: self.key_prefix.clone(), prefix: message.tenant.clone(),
}) })
.await; .await;
key key
@ -115,7 +123,7 @@ impl PubSubInterface for std::sync::Arc<redis_interface::RedisConnectionPool> {
CGRAPH_CACHE CGRAPH_CACHE
.remove(CacheKey { .remove(CacheKey {
key: key.to_string(), key: key.to_string(),
prefix: self.key_prefix.clone(), prefix: message.tenant.clone(),
}) })
.await; .await;
key key
@ -124,7 +132,7 @@ impl PubSubInterface for std::sync::Arc<redis_interface::RedisConnectionPool> {
PM_FILTERS_CGRAPH_CACHE PM_FILTERS_CGRAPH_CACHE
.remove(CacheKey { .remove(CacheKey {
key: key.to_string(), key: key.to_string(),
prefix: self.key_prefix.clone(), prefix: message.tenant.clone(),
}) })
.await; .await;
key key
@ -133,7 +141,7 @@ impl PubSubInterface for std::sync::Arc<redis_interface::RedisConnectionPool> {
ROUTING_CACHE ROUTING_CACHE
.remove(CacheKey { .remove(CacheKey {
key: key.to_string(), key: key.to_string(),
prefix: self.key_prefix.clone(), prefix: message.tenant.clone(),
}) })
.await; .await;
key key
@ -142,7 +150,7 @@ impl PubSubInterface for std::sync::Arc<redis_interface::RedisConnectionPool> {
DECISION_MANAGER_CACHE DECISION_MANAGER_CACHE
.remove(CacheKey { .remove(CacheKey {
key: key.to_string(), key: key.to_string(),
prefix: self.key_prefix.clone(), prefix: message.tenant.clone(),
}) })
.await; .await;
key key
@ -151,7 +159,7 @@ impl PubSubInterface for std::sync::Arc<redis_interface::RedisConnectionPool> {
SURCHARGE_CACHE SURCHARGE_CACHE
.remove(CacheKey { .remove(CacheKey {
key: key.to_string(), key: key.to_string(),
prefix: self.key_prefix.clone(), prefix: message.tenant.clone(),
}) })
.await; .await;
key key
@ -160,43 +168,43 @@ impl PubSubInterface for std::sync::Arc<redis_interface::RedisConnectionPool> {
CONFIG_CACHE CONFIG_CACHE
.remove(CacheKey { .remove(CacheKey {
key: key.to_string(), key: key.to_string(),
prefix: self.key_prefix.clone(), prefix: message.tenant.clone(),
}) })
.await; .await;
ACCOUNTS_CACHE ACCOUNTS_CACHE
.remove(CacheKey { .remove(CacheKey {
key: key.to_string(), key: key.to_string(),
prefix: self.key_prefix.clone(), prefix: message.tenant.clone(),
}) })
.await; .await;
CGRAPH_CACHE CGRAPH_CACHE
.remove(CacheKey { .remove(CacheKey {
key: key.to_string(), key: key.to_string(),
prefix: self.key_prefix.clone(), prefix: message.tenant.clone(),
}) })
.await; .await;
PM_FILTERS_CGRAPH_CACHE PM_FILTERS_CGRAPH_CACHE
.remove(CacheKey { .remove(CacheKey {
key: key.to_string(), key: key.to_string(),
prefix: self.key_prefix.clone(), prefix: message.tenant.clone(),
}) })
.await; .await;
ROUTING_CACHE ROUTING_CACHE
.remove(CacheKey { .remove(CacheKey {
key: key.to_string(), key: key.to_string(),
prefix: self.key_prefix.clone(), prefix: message.tenant.clone(),
}) })
.await; .await;
DECISION_MANAGER_CACHE DECISION_MANAGER_CACHE
.remove(CacheKey { .remove(CacheKey {
key: key.to_string(), key: key.to_string(),
prefix: self.key_prefix.clone(), prefix: message.tenant.clone(),
}) })
.await; .await;
SURCHARGE_CACHE SURCHARGE_CACHE
.remove(CacheKey { .remove(CacheKey {
key: key.to_string(), key: key.to_string(),
prefix: self.key_prefix.clone(), prefix: message.tenant.clone(),
}) })
.await; .await;
@ -210,7 +218,9 @@ impl PubSubInterface for std::sync::Arc<redis_interface::RedisConnectionPool> {
.ok(); .ok();
logger::debug!( logger::debug!(
"Handled message on channel {channel_name} - Done invalidating {key}" key_prefix=?message.tenant.clone(),
channel_name=?channel_name,
"Done invalidating {key}"
); );
} }
_ => { _ => {