diff --git a/crates/redis_interface/src/types.rs b/crates/redis_interface/src/types.rs index 6469df5077..f40b81af68 100644 --- a/crates/redis_interface/src/types.rs +++ b/crates/redis_interface/src/types.rs @@ -27,6 +27,12 @@ impl RedisValue { pub fn into_inner(self) -> FredRedisValue { self.inner } + + pub fn from_bytes(val: Vec) -> Self { + Self { + inner: FredRedisValue::Bytes(val.into()), + } + } pub fn from_string(value: String) -> Self { Self { inner: FredRedisValue::String(value.into()), @@ -34,6 +40,12 @@ impl RedisValue { } } +impl From for FredRedisValue { + fn from(v: RedisValue) -> Self { + v.inner + } +} + #[derive(Debug, serde::Deserialize, Clone)] #[serde(default)] pub struct RedisSettings { diff --git a/crates/storage_impl/src/redis/cache.rs b/crates/storage_impl/src/redis/cache.rs index 6cfadbcb25..ba4a8200fa 100644 --- a/crates/storage_impl/src/redis/cache.rs +++ b/crates/storage_impl/src/redis/cache.rs @@ -2,7 +2,7 @@ use std::{any::Any, borrow::Cow, fmt::Debug, sync::Arc}; use common_utils::{ errors::{self, CustomResult}, - ext_traits::AsyncExt, + ext_traits::{AsyncExt, ByteSliceExt}, }; use dyn_clone::DynClone; use error_stack::{Report, ResultExt}; @@ -23,30 +23,6 @@ use crate::{ /// Redis channel name used for publishing invalidation messages pub const IMC_INVALIDATION_CHANNEL: &str = "hyperswitch_invalidate"; -/// Prefix for config cache key -const CONFIG_CACHE_PREFIX: &str = "config"; - -/// Prefix for accounts cache key -const ACCOUNTS_CACHE_PREFIX: &str = "accounts"; - -/// Prefix for routing cache key -const ROUTING_CACHE_PREFIX: &str = "routing"; - -/// Prefix for three ds decision manager cache key -const DECISION_MANAGER_CACHE_PREFIX: &str = "decision_manager"; - -/// Prefix for surcharge cache key -const SURCHARGE_CACHE_PREFIX: &str = "surcharge"; - -/// Prefix for cgraph cache key -const CGRAPH_CACHE_PREFIX: &str = "cgraph"; - -/// Prefix for all kinds of cache key -const ALL_CACHE_PREFIX: &str = "all_cache_kind"; - -/// Prefix for PM Filter cgraph cache key -const PM_FILTERS_CGRAPH_CACHE_PREFIX: &str = "pm_filters_cgraph"; - /// Time to live 30 mins const CACHE_TTL: u64 = 30 * 60; @@ -101,6 +77,13 @@ pub trait Cacheable: Any + Send + Sync + DynClone { fn as_any(&self) -> &dyn Any; } +#[derive(serde::Serialize, serde::Deserialize)] +pub struct CacheRedact<'a> { + pub tenant: String, + pub kind: CacheKind<'a>, +} + +#[derive(serde::Serialize, serde::Deserialize)] pub enum CacheKind<'a> { Config(Cow<'a, str>), Accounts(Cow<'a, str>), @@ -112,45 +95,30 @@ pub enum CacheKind<'a> { All(Cow<'a, str>), } -impl<'a> From> for RedisValue { - fn from(kind: CacheKind<'a>) -> Self { - let value = match kind { - CacheKind::Config(s) => format!("{CONFIG_CACHE_PREFIX},{s}"), - CacheKind::Accounts(s) => format!("{ACCOUNTS_CACHE_PREFIX},{s}"), - CacheKind::Routing(s) => format!("{ROUTING_CACHE_PREFIX},{s}"), - CacheKind::DecisionManager(s) => format!("{DECISION_MANAGER_CACHE_PREFIX},{s}"), - CacheKind::Surcharge(s) => format!("{SURCHARGE_CACHE_PREFIX},{s}"), - CacheKind::CGraph(s) => format!("{CGRAPH_CACHE_PREFIX},{s}"), - CacheKind::PmFiltersCGraph(s) => format!("{PM_FILTERS_CGRAPH_CACHE_PREFIX},{s}"), - CacheKind::All(s) => format!("{ALL_CACHE_PREFIX},{s}"), - }; - Self::from_string(value) +impl<'a> TryFrom> for RedisValue { + type Error = Report; + fn try_from(v: CacheRedact<'a>) -> Result { + Ok(Self::from_bytes(serde_json::to_vec(&v).change_context( + errors::ValidationError::InvalidValue { + message: "Invalid publish key provided in pubsub".into(), + }, + )?)) } } -impl<'a> TryFrom for CacheKind<'a> { +impl<'a> TryFrom for CacheRedact<'a> { type Error = Report; - fn try_from(kind: RedisValue) -> Result { - let validation_err = errors::ValidationError::InvalidValue { - message: "Invalid publish key provided in pubsub".into(), - }; - let kind = kind.as_string().ok_or(validation_err.clone())?; - let split = kind.split_once(',').ok_or(validation_err.clone())?; - match split.0 { - ACCOUNTS_CACHE_PREFIX => Ok(Self::Accounts(Cow::Owned(split.1.to_string()))), - CONFIG_CACHE_PREFIX => Ok(Self::Config(Cow::Owned(split.1.to_string()))), - ROUTING_CACHE_PREFIX => Ok(Self::Routing(Cow::Owned(split.1.to_string()))), - DECISION_MANAGER_CACHE_PREFIX => { - Ok(Self::DecisionManager(Cow::Owned(split.1.to_string()))) - } - SURCHARGE_CACHE_PREFIX => Ok(Self::Surcharge(Cow::Owned(split.1.to_string()))), - CGRAPH_CACHE_PREFIX => Ok(Self::CGraph(Cow::Owned(split.1.to_string()))), - PM_FILTERS_CGRAPH_CACHE_PREFIX => { - Ok(Self::PmFiltersCGraph(Cow::Owned(split.1.to_string()))) - } - ALL_CACHE_PREFIX => Ok(Self::All(Cow::Owned(split.1.to_string()))), - _ => Err(validation_err.into()), - } + + fn try_from(v: RedisValue) -> Result { + let bytes = v.as_bytes().ok_or(errors::ValidationError::InvalidValue { + message: "InvalidValue received in pubsub".to_string(), + })?; + + bytes + .parse_struct("CacheRedact") + .change_context(errors::ValidationError::InvalidValue { + message: "Unable to deserialize the value from pubsub".to_string(), + }) } } diff --git a/crates/storage_impl/src/redis/pub_sub.rs b/crates/storage_impl/src/redis/pub_sub.rs index 89877f7e8d..8f24b62fca 100644 --- a/crates/storage_impl/src/redis/pub_sub.rs +++ b/crates/storage_impl/src/redis/pub_sub.rs @@ -5,8 +5,8 @@ use redis_interface::{errors as redis_errors, PubsubInterface, RedisValue}; use router_env::{logger, tracing::Instrument}; use crate::redis::cache::{ - CacheKey, CacheKind, ACCOUNTS_CACHE, CGRAPH_CACHE, CONFIG_CACHE, DECISION_MANAGER_CACHE, - PM_FILTERS_CGRAPH_CACHE, ROUTING_CACHE, SURCHARGE_CACHE, + CacheKey, CacheKind, CacheRedact, ACCOUNTS_CACHE, CGRAPH_CACHE, CONFIG_CACHE, + DECISION_MANAGER_CACHE, PM_FILTERS_CGRAPH_CACHE, ROUTING_CACHE, SURCHARGE_CACHE, }; #[async_trait::async_trait] @@ -66,15 +66,23 @@ impl PubSubInterface for std::sync::Arc { channel: &str, key: CacheKind<'a>, ) -> error_stack::Result { + let key = CacheRedact { + kind: key, + tenant: self.key_prefix.clone(), + }; + self.publisher - .publish(channel, RedisValue::from(key).into_inner()) + .publish( + channel, + RedisValue::try_from(key).change_context(redis_errors::RedisError::PublishError)?, + ) .await .change_context(redis_errors::RedisError::SubscribeError) } #[inline] async fn on_message(&self) -> error_stack::Result<(), redis_errors::RedisError> { - logger::debug!("Started on message: {:?}", self.key_prefix); + logger::debug!("Started on message"); let mut rx = self.subscriber.on_message(); while let Ok(message) = rx.recv().await { let channel_name = message.channel.to_string(); @@ -82,7 +90,7 @@ impl PubSubInterface for std::sync::Arc { match channel_name.as_str() { super::cache::IMC_INVALIDATION_CHANNEL => { - let key = match CacheKind::try_from(RedisValue::new(message.value)) + let message = match CacheRedact::try_from(RedisValue::new(message.value)) .change_context(redis_errors::RedisError::OnMessageError) { Ok(value) => value, @@ -92,12 +100,12 @@ impl PubSubInterface for std::sync::Arc { } }; - let key = match key { + let key = match message.kind { CacheKind::Config(key) => { CONFIG_CACHE .remove(CacheKey { key: key.to_string(), - prefix: self.key_prefix.clone(), + prefix: message.tenant.clone(), }) .await; key @@ -106,7 +114,7 @@ impl PubSubInterface for std::sync::Arc { ACCOUNTS_CACHE .remove(CacheKey { key: key.to_string(), - prefix: self.key_prefix.clone(), + prefix: message.tenant.clone(), }) .await; key @@ -115,7 +123,7 @@ impl PubSubInterface for std::sync::Arc { CGRAPH_CACHE .remove(CacheKey { key: key.to_string(), - prefix: self.key_prefix.clone(), + prefix: message.tenant.clone(), }) .await; key @@ -124,7 +132,7 @@ impl PubSubInterface for std::sync::Arc { PM_FILTERS_CGRAPH_CACHE .remove(CacheKey { key: key.to_string(), - prefix: self.key_prefix.clone(), + prefix: message.tenant.clone(), }) .await; key @@ -133,7 +141,7 @@ impl PubSubInterface for std::sync::Arc { ROUTING_CACHE .remove(CacheKey { key: key.to_string(), - prefix: self.key_prefix.clone(), + prefix: message.tenant.clone(), }) .await; key @@ -142,7 +150,7 @@ impl PubSubInterface for std::sync::Arc { DECISION_MANAGER_CACHE .remove(CacheKey { key: key.to_string(), - prefix: self.key_prefix.clone(), + prefix: message.tenant.clone(), }) .await; key @@ -151,7 +159,7 @@ impl PubSubInterface for std::sync::Arc { SURCHARGE_CACHE .remove(CacheKey { key: key.to_string(), - prefix: self.key_prefix.clone(), + prefix: message.tenant.clone(), }) .await; key @@ -160,43 +168,43 @@ impl PubSubInterface for std::sync::Arc { CONFIG_CACHE .remove(CacheKey { key: key.to_string(), - prefix: self.key_prefix.clone(), + prefix: message.tenant.clone(), }) .await; ACCOUNTS_CACHE .remove(CacheKey { key: key.to_string(), - prefix: self.key_prefix.clone(), + prefix: message.tenant.clone(), }) .await; CGRAPH_CACHE .remove(CacheKey { key: key.to_string(), - prefix: self.key_prefix.clone(), + prefix: message.tenant.clone(), }) .await; PM_FILTERS_CGRAPH_CACHE .remove(CacheKey { key: key.to_string(), - prefix: self.key_prefix.clone(), + prefix: message.tenant.clone(), }) .await; ROUTING_CACHE .remove(CacheKey { key: key.to_string(), - prefix: self.key_prefix.clone(), + prefix: message.tenant.clone(), }) .await; DECISION_MANAGER_CACHE .remove(CacheKey { key: key.to_string(), - prefix: self.key_prefix.clone(), + prefix: message.tenant.clone(), }) .await; SURCHARGE_CACHE .remove(CacheKey { key: key.to_string(), - prefix: self.key_prefix.clone(), + prefix: message.tenant.clone(), }) .await; @@ -210,7 +218,9 @@ impl PubSubInterface for std::sync::Arc { .ok(); logger::debug!( - "Handled message on channel {channel_name} - Done invalidating {key}" + key_prefix=?message.tenant.clone(), + channel_name=?channel_name, + "Done invalidating {key}" ); } _ => {