feat: add in-memory cache support for config table (#751)

This commit is contained in:
Kartikeya Hegde
2023-03-21 17:26:48 +05:30
committed by GitHub
parent 568bf01a56
commit abedaae4e8
8 changed files with 157 additions and 71 deletions

View File

@ -25,7 +25,8 @@ use std::sync::{atomic, Arc};
use common_utils::errors::CustomResult;
use error_stack::{IntoReport, ResultExt};
use fred::interfaces::{ClientLike, PubsubInterface};
use fred::interfaces::ClientLike;
pub use fred::interfaces::PubsubInterface;
use futures::StreamExt;
use router_env::logger;
@ -35,8 +36,8 @@ pub struct RedisConnectionPool {
pub pool: fred::pool::RedisPool,
config: RedisConfig,
join_handles: Vec<fred::types::ConnectHandle>,
subscriber: RedisClient,
publisher: RedisClient,
pub subscriber: RedisClient,
pub publisher: RedisClient,
pub is_redis_available: Arc<atomic::AtomicBool>,
}
@ -153,44 +154,6 @@ impl RedisConnectionPool {
}
}
#[async_trait::async_trait]
pub trait PubSubInterface {
async fn subscribe(&self, channel: &str) -> CustomResult<usize, errors::RedisError>;
async fn publish(&self, channel: &str, key: &str) -> CustomResult<usize, errors::RedisError>;
async fn on_message(&self) -> CustomResult<(), errors::RedisError>;
}
#[async_trait::async_trait]
impl PubSubInterface for RedisConnectionPool {
#[inline]
async fn subscribe(&self, channel: &str) -> CustomResult<usize, errors::RedisError> {
self.subscriber
.subscribe(channel)
.await
.into_report()
.change_context(errors::RedisError::SubscribeError)
}
#[inline]
async fn publish(&self, channel: &str, key: &str) -> CustomResult<usize, errors::RedisError> {
self.publisher
.publish(channel, key)
.await
.into_report()
.change_context(errors::RedisError::SubscribeError)
}
#[inline]
async fn on_message(&self) -> CustomResult<(), errors::RedisError> {
let mut message = self.subscriber.on_message();
while let Some((_, key)) = message.next().await {
let key = key
.as_string()
.ok_or::<errors::RedisError>(errors::RedisError::DeleteFailed)?;
self.delete_key(&key).await?;
}
Ok(())
}
}
struct RedisConfig {
default_ttl: u32,
default_stream_read_count: u64,

View File

@ -1,7 +1,17 @@
use std::any::Any;
use std::{any::Any, sync::Arc};
use dyn_clone::DynClone;
use moka::future::Cache as MokaCache;
use once_cell::sync::Lazy;
/// Time to live 30 mins
const CACHE_TTL: u64 = 30 * 60;
/// Time to idle 10 mins
const CACHE_TTI: u64 = 10 * 60;
/// Config Cache with time_to_live as 30 mins and time_to_idle as 10 mins.
pub static CONFIG_CACHE: Lazy<Cache> = Lazy::new(|| Cache::new(CACHE_TTL, CACHE_TTI));
/// Trait which defines the behaviour of types that's gonna be stored in Cache
pub trait Cacheable: Any + Send + Sync + DynClone {
@ -20,11 +30,11 @@ where
dyn_clone::clone_trait_object!(Cacheable);
pub struct Cache {
inner: MokaCache<String, Box<dyn Cacheable>>,
inner: MokaCache<String, Arc<dyn Cacheable>>,
}
impl std::ops::Deref for Cache {
type Target = MokaCache<String, Box<dyn Cacheable>>;
type Target = MokaCache<String, Arc<dyn Cacheable>>;
fn deref(&self) -> &Self::Target {
&self.inner
}
@ -45,7 +55,11 @@ impl Cache {
}
}
pub fn get_val<T: Clone + Any>(&self, key: &str) -> Option<T> {
pub async fn push<T: Cacheable>(&self, key: String, val: T) {
self.insert(key, Arc::new(val)).await;
}
pub fn get_val<T: Clone + Cacheable>(&self, key: &str) -> Option<T> {
let val = self.get(key)?;
(*val).as_any().downcast_ref::<T>().cloned()
}
@ -58,9 +72,7 @@ mod cache_tests {
#[tokio::test]
async fn construct_and_get_cache() {
let cache = Cache::new(1800, 1800);
cache
.insert("key".to_string(), Box::new("val".to_string()))
.await;
cache.push("key".to_string(), "val".to_string()).await;
assert_eq!(cache.get_val::<String>("key"), Some(String::from("val")));
}
}

View File

@ -1,9 +1,15 @@
use common_utils::ext_traits::AsyncExt;
use error_stack::ResultExt;
use super::Store;
use crate::core::errors::{self, CustomResult};
use crate::{
cache::{self, Cacheable},
consts,
core::errors::{self, CustomResult},
services::PubSubInterface,
};
pub async fn get_or_populate_cache<T, F, Fut>(
pub async fn get_or_populate_redis<T, F, Fut>(
store: &Store,
key: &str,
fun: F,
@ -36,10 +42,52 @@ where
}
}
pub async fn get_or_populate_in_memory<T, F, Fut>(
store: &Store,
key: &str,
fun: F,
cache: &cache::Cache,
) -> CustomResult<T, errors::StorageError>
where
T: Cacheable + serde::Serialize + serde::de::DeserializeOwned + std::fmt::Debug + Clone,
F: FnOnce() -> Fut + Send,
Fut: futures::Future<Output = CustomResult<T, errors::StorageError>> + Send,
{
let cache_val = cache.get_val::<T>(key);
if let Some(val) = cache_val {
Ok(val)
} else {
let val = get_or_populate_redis(store, key, fun).await?;
cache.push(key.to_string(), val.clone()).await;
Ok(val)
}
}
pub async fn redact_cache<T, F, Fut>(
store: &Store,
key: &str,
fun: F,
in_memory: Option<&cache::Cache>,
) -> CustomResult<T, errors::StorageError>
where
F: FnOnce() -> Fut + Send,
Fut: futures::Future<Output = CustomResult<T, errors::StorageError>> + Send,
{
let data = fun().await?;
in_memory.async_map(|cache| cache.invalidate(key)).await;
store
.redis_conn()
.map_err(Into::<errors::StorageError>::into)?
.delete_key(key)
.await
.change_context(errors::StorageError::KVError)?;
Ok(data)
}
pub async fn publish_and_redact<T, F, Fut>(
store: &Store,
key: &str,
fun: F,
) -> CustomResult<T, errors::StorageError>
where
F: FnOnce() -> Fut + Send,
@ -49,7 +97,7 @@ where
store
.redis_conn()
.map_err(Into::<errors::StorageError>::into)?
.delete_key(key)
.publish(consts::PUB_SUB_CHANNEL, key)
.await
.change_context(errors::StorageError::KVError)?;
Ok(data)

View File

@ -2,8 +2,10 @@ use error_stack::IntoReport;
use super::{cache, MockDb, Store};
use crate::{
connection,
cache::CONFIG_CACHE,
connection, consts,
core::errors::{self, CustomResult},
services::PubSubInterface,
types::storage,
};
@ -76,26 +78,31 @@ impl ConfigInterface for Store {
key: &str,
config_update: storage::ConfigUpdate,
) -> CustomResult<storage::Config, errors::StorageError> {
cache::redact_cache(self, key, || async {
self.update_config_by_key(key, config_update).await
})
.await
cache::publish_and_redact(self, key, || self.update_config_by_key(key, config_update)).await
}
async fn find_config_by_key_cached(
&self,
key: &str,
) -> CustomResult<storage::Config, errors::StorageError> {
cache::get_or_populate_cache(self, key, || async { self.find_config_by_key(key).await })
cache::get_or_populate_in_memory(self, key, || self.find_config_by_key(key), &CONFIG_CACHE)
.await
}
async fn delete_config_by_key(&self, key: &str) -> CustomResult<bool, errors::StorageError> {
let conn = connection::pg_connection_write(self).await?;
storage::Config::delete_by_key(&conn, key)
let deleted = storage::Config::delete_by_key(&conn, key)
.await
.map_err(Into::into)
.into_report()
.into_report()?;
self.redis_conn()
.map_err(Into::<errors::StorageError>::into)?
.publish(consts::PUB_SUB_CHANNEL, key)
.await
.map_err(Into::<errors::StorageError>::into)?;
Ok(deleted)
}
}

View File

@ -75,7 +75,7 @@ impl MerchantAccountInterface for Store {
#[cfg(feature = "accounts_cache")]
{
super::cache::get_or_populate_cache(self, merchant_id, fetch_func).await
super::cache::get_or_populate_redis(self, merchant_id, fetch_func).await
}
}
@ -100,7 +100,7 @@ impl MerchantAccountInterface for Store {
#[cfg(feature = "accounts_cache")]
{
super::cache::redact_cache(self, &_merchant_id, update_func).await
super::cache::redact_cache(self, &_merchant_id, update_func, None).await
}
}
@ -128,7 +128,7 @@ impl MerchantAccountInterface for Store {
#[cfg(feature = "accounts_cache")]
{
super::cache::redact_cache(self, merchant_id, update_func).await
super::cache::redact_cache(self, merchant_id, update_func, None).await
}
}
@ -162,7 +162,7 @@ impl MerchantAccountInterface for Store {
#[cfg(feature = "accounts_cache")]
{
super::cache::redact_cache(self, merchant_id, delete_func).await
super::cache::redact_cache(self, merchant_id, delete_func, None).await
}
}
}

View File

@ -175,7 +175,7 @@ impl MerchantConnectorAccountInterface for Store {
#[cfg(feature = "accounts_cache")]
{
super::cache::get_or_populate_cache(self, merchant_connector_id, find_call).await
super::cache::get_or_populate_redis(self, merchant_connector_id, find_call).await
}
}
@ -215,7 +215,7 @@ impl MerchantConnectorAccountInterface for Store {
#[cfg(feature = "accounts_cache")]
{
super::cache::redact_cache(self, &_merchant_connector_id, update_call).await
super::cache::redact_cache(self, &_merchant_connector_id, update_call, None).await
}
#[cfg(not(feature = "accounts_cache"))]

View File

@ -40,7 +40,7 @@ impl ReverseLookupInterface for Store {
.map_err(Into::into)
.into_report()
};
cache::get_or_populate_cache(self, id, database_call).await
cache::get_or_populate_redis(self, id, database_call).await
}
}

View File

@ -7,16 +7,73 @@ pub mod logger;
use std::sync::{atomic, Arc};
use redis_interface::{errors::RedisError, PubSubInterface};
use error_stack::{IntoReport, ResultExt};
use futures::StreamExt;
use redis_interface::{errors as redis_errors, PubsubInterface};
pub use self::{api::*, encryption::*};
use crate::{
async_spawn,
cache::CONFIG_CACHE,
connection::{diesel_make_pg_pool, PgPool},
consts,
core::errors,
};
#[async_trait::async_trait]
pub trait PubSubInterface {
async fn subscribe(
&self,
channel: &str,
) -> errors::CustomResult<usize, redis_errors::RedisError>;
async fn publish(
&self,
channel: &str,
key: &str,
) -> errors::CustomResult<usize, redis_errors::RedisError>;
async fn on_message(&self) -> errors::CustomResult<(), redis_errors::RedisError>;
}
#[async_trait::async_trait]
impl PubSubInterface for redis_interface::RedisConnectionPool {
#[inline]
async fn subscribe(
&self,
channel: &str,
) -> errors::CustomResult<usize, redis_errors::RedisError> {
self.subscriber
.subscribe(channel)
.await
.into_report()
.change_context(redis_errors::RedisError::SubscribeError)
}
#[inline]
async fn publish(
&self,
channel: &str,
key: &str,
) -> errors::CustomResult<usize, redis_errors::RedisError> {
self.publisher
.publish(channel, key)
.await
.into_report()
.change_context(redis_errors::RedisError::SubscribeError)
}
#[inline]
async fn on_message(&self) -> errors::CustomResult<(), redis_errors::RedisError> {
let mut message = self.subscriber.on_message();
while let Some((_, key)) = message.next().await {
let key = key
.as_string()
.ok_or::<redis_errors::RedisError>(redis_errors::RedisError::DeleteFailed)?;
self.delete_key(&key).await?;
CONFIG_CACHE.invalidate(&key).await;
}
Ok(())
}
}
#[derive(Clone)]
pub struct Store {
pub master_pool: PgPool,
@ -74,7 +131,8 @@ impl Store {
pub fn redis_conn(
&self,
) -> errors::CustomResult<Arc<redis_interface::RedisConnectionPool>, RedisError> {
) -> errors::CustomResult<Arc<redis_interface::RedisConnectionPool>, redis_errors::RedisError>
{
if self
.redis_conn
.is_redis_available
@ -82,7 +140,7 @@ impl Store {
{
Ok(self.redis_conn.clone())
} else {
Err(RedisError::RedisConnectionError.into())
Err(redis_errors::RedisError::RedisConnectionError.into())
}
}
@ -95,8 +153,6 @@ impl Store {
where
T: crate::utils::storage_partitioning::KvStorePartition,
{
use error_stack::ResultExt;
let shard_key = T::shard_key(partition_key, self.config.drainer_num_partitions);
let stream_name = self.get_drainer_stream_name(&shard_key);
self.redis_conn