chore: remove redundant caching code (#4804)

This commit is contained in:
Sanchith Hegde
2024-05-30 13:06:52 +05:30
committed by GitHub
parent d790f8f361
commit 971ef1fb8f
17 changed files with 149 additions and 283 deletions

View File

@ -4,7 +4,7 @@ use diesel_models as store;
use error_stack::ResultExt;
use hyperswitch_domain_models::errors::{StorageError, StorageResult};
use masking::StrongSecret;
use redis::{kv_store::RedisConnInterface, RedisStore};
use redis::{kv_store::RedisConnInterface, pub_sub::PubSubInterface, RedisStore};
mod address;
pub mod config;
pub mod connection;
@ -105,7 +105,8 @@ impl<T: DatabaseStore> RouterStore<T> {
.attach_printable("Failed to create cache store")?;
cache_store.set_error_callback(cache_error_signal);
cache_store
.subscribe_to_channel(inmemory_cache_stream)
.redis_conn
.subscribe(inmemory_cache_stream)
.await
.change_context(StorageError::InitializationError)
.attach_printable("Failed to subscribe to inmemory cache stream")?;

View File

@ -4,9 +4,7 @@ pub mod pub_sub;
use std::sync::{atomic, Arc};
use error_stack::ResultExt;
use redis_interface::PubsubInterface;
use router_env::{logger, tracing::Instrument};
use router_env::tracing::Instrument;
use self::{kv_store::RedisConnInterface, pub_sub::PubSubInterface};
@ -42,30 +40,6 @@ impl RedisStore {
.in_current_span(),
);
}
pub async fn subscribe_to_channel(
&self,
channel: &str,
) -> error_stack::Result<(), redis_interface::errors::RedisError> {
self.redis_conn.subscriber.manage_subscriptions();
self.redis_conn
.subscriber
.subscribe::<(), _>(channel)
.await
.change_context(redis_interface::errors::RedisError::SubscribeError)?;
let redis_clone = self.redis_conn.clone();
let _task_handle = tokio::spawn(
async move {
if let Err(e) = redis_clone.on_message().await {
logger::error!(pubsub_err=?e);
}
}
.in_current_span(),
);
Ok(())
}
}
impl RedisConnInterface for RedisStore {

View File

@ -6,14 +6,18 @@ use common_utils::{
};
use dyn_clone::DynClone;
use error_stack::{Report, ResultExt};
use hyperswitch_domain_models::errors::StorageError;
use moka::future::Cache as MokaCache;
use once_cell::sync::Lazy;
use redis_interface::{errors::RedisError, RedisValue};
use router_env::tracing::{self, instrument};
use super::{kv_store::RedisConnInterface, pub_sub::PubSubInterface};
use crate::{
errors::StorageError,
redis::{PubSubInterface, RedisConnInterface},
};
pub(crate) const PUB_SUB_CHANNEL: &str = "hyperswitch_invalidate";
/// Redis channel name used for publishing invalidation messages
pub const PUB_SUB_CHANNEL: &str = "hyperswitch_invalidate";
/// Prefix for config cache key
const CONFIG_CACHE_PREFIX: &str = "config";
@ -24,7 +28,7 @@ const ACCOUNTS_CACHE_PREFIX: &str = "accounts";
/// Prefix for routing cache key
const ROUTING_CACHE_PREFIX: &str = "routing";
/// Prefix for kgraph cache key
/// Prefix for cgraph cache key
const CGRAPH_CACHE_PREFIX: &str = "cgraph";
/// Prefix for PM Filter cgraph cache key
@ -165,6 +169,7 @@ impl Cache {
}
}
#[instrument(skip_all)]
pub async fn get_or_populate_redis<T, F, Fut>(
store: &(dyn RedisConnInterface + Send + Sync),
key: impl AsRef<str>,
@ -179,10 +184,9 @@ where
let key = key.as_ref();
let redis = &store
.get_redis_conn()
.map_err(|er| {
let error = format!("{}", er);
er.change_context(StorageError::RedisError(error))
})
.change_context(StorageError::RedisError(
RedisError::RedisConnectionError.into(),
))
.attach_printable("Failed to get redis connection")?;
let redis_val = redis.get_and_deserialize_key::<T>(key, type_name).await;
let get_data_set_redis = || async {
@ -206,6 +210,7 @@ where
}
}
#[instrument(skip_all)]
pub async fn get_or_populate_in_memory<T, F, Fut>(
store: &(dyn RedisConnInterface + Send + Sync),
key: &str,
@ -227,8 +232,9 @@ where
}
}
#[instrument(skip_all)]
pub async fn redact_cache<T, F, Fut>(
store: &dyn RedisConnInterface,
store: &(dyn RedisConnInterface + Send + Sync),
key: &str,
fun: F,
in_memory: Option<&Cache>,
@ -242,10 +248,9 @@ where
let redis_conn = store
.get_redis_conn()
.map_err(|er| {
let error = format!("{}", er);
er.change_context(StorageError::RedisError(error))
})
.change_context(StorageError::RedisError(
RedisError::RedisConnectionError.into(),
))
.attach_printable("Failed to get redis connection")?;
redis_conn
@ -255,26 +260,35 @@ where
Ok(data)
}
pub async fn publish_into_redact_channel<'a>(
store: &dyn RedisConnInterface,
key: CacheKind<'a>,
#[instrument(skip_all)]
pub async fn publish_into_redact_channel<'a, K: IntoIterator<Item = CacheKind<'a>> + Send>(
store: &(dyn RedisConnInterface + Send + Sync),
keys: K,
) -> CustomResult<usize, StorageError> {
let redis_conn = store
.get_redis_conn()
.map_err(|er| {
let error = format!("{}", er);
er.change_context(StorageError::RedisError(error))
})
.change_context(StorageError::RedisError(
RedisError::RedisConnectionError.into(),
))
.attach_printable("Failed to get redis connection")?;
redis_conn
.publish(PUB_SUB_CHANNEL, key)
.await
.change_context(StorageError::KVError)
let futures = keys.into_iter().map(|key| async {
redis_conn
.clone()
.publish(PUB_SUB_CHANNEL, key)
.await
.change_context(StorageError::KVError)
});
Ok(futures::future::try_join_all(futures)
.await?
.iter()
.sum::<usize>())
}
#[instrument(skip_all)]
pub async fn publish_and_redact<'a, T, F, Fut>(
store: &dyn RedisConnInterface,
store: &(dyn RedisConnInterface + Send + Sync),
key: CacheKind<'a>,
fun: F,
) -> CustomResult<T, StorageError>
@ -283,7 +297,23 @@ where
Fut: futures::Future<Output = CustomResult<T, StorageError>> + Send,
{
let data = fun().await?;
publish_into_redact_channel(store, key).await?;
publish_into_redact_channel(store, [key]).await?;
Ok(data)
}
#[instrument(skip_all)]
pub async fn publish_and_redact_multiple<'a, T, F, Fut, K>(
store: &(dyn RedisConnInterface + Send + Sync),
keys: K,
fun: F,
) -> CustomResult<T, StorageError>
where
F: FnOnce() -> Fut + Send,
Fut: futures::Future<Output = CustomResult<T, StorageError>> + Send,
K: IntoIterator<Item = CacheKind<'a>> + Send,
{
let data = fun().await?;
publish_into_redact_channel(store, keys).await?;
Ok(data)
}

View File

@ -1,6 +1,6 @@
use error_stack::ResultExt;
use redis_interface::{errors as redis_errors, PubsubInterface, RedisValue};
use router_env::logger;
use router_env::{logger, tracing::Instrument};
use crate::redis::cache::{
CacheKind, ACCOUNTS_CACHE, CGRAPH_CACHE, CONFIG_CACHE, PM_FILTERS_CGRAPH_CACHE, ROUTING_CACHE,
@ -20,7 +20,7 @@ pub trait PubSubInterface {
}
#[async_trait::async_trait]
impl PubSubInterface for redis_interface::RedisConnectionPool {
impl PubSubInterface for std::sync::Arc<redis_interface::RedisConnectionPool> {
#[inline]
async fn subscribe(&self, channel: &str) -> error_stack::Result<(), redis_errors::RedisError> {
// Spawns a task that will automatically re-subscribe to any channels or channel patterns used by the client.
@ -29,7 +29,18 @@ impl PubSubInterface for redis_interface::RedisConnectionPool {
self.subscriber
.subscribe(channel)
.await
.change_context(redis_errors::RedisError::SubscribeError)
.change_context(redis_errors::RedisError::SubscribeError)?;
let redis_clone = self.clone();
let _task_handle = tokio::spawn(
async move {
if let Err(pubsub_error) = redis_clone.on_message().await {
logger::error!(?pubsub_error);
}
}
.in_current_span(),
);
Ok(())
}
#[inline]