diff --git a/crates/redis_interface/src/lib.rs b/crates/redis_interface/src/lib.rs index 504cb8fccf..f4fc517d12 100644 --- a/crates/redis_interface/src/lib.rs +++ b/crates/redis_interface/src/lib.rs @@ -25,7 +25,8 @@ use std::sync::{atomic, Arc}; use common_utils::errors::CustomResult; use error_stack::{IntoReport, ResultExt}; -use fred::interfaces::{ClientLike, PubsubInterface}; +use fred::interfaces::ClientLike; +pub use fred::interfaces::PubsubInterface; use futures::StreamExt; use router_env::logger; @@ -35,8 +36,8 @@ pub struct RedisConnectionPool { pub pool: fred::pool::RedisPool, config: RedisConfig, join_handles: Vec, - subscriber: RedisClient, - publisher: RedisClient, + pub subscriber: RedisClient, + pub publisher: RedisClient, pub is_redis_available: Arc, } @@ -153,44 +154,6 @@ impl RedisConnectionPool { } } -#[async_trait::async_trait] -pub trait PubSubInterface { - async fn subscribe(&self, channel: &str) -> CustomResult; - async fn publish(&self, channel: &str, key: &str) -> CustomResult; - async fn on_message(&self) -> CustomResult<(), errors::RedisError>; -} - -#[async_trait::async_trait] -impl PubSubInterface for RedisConnectionPool { - #[inline] - async fn subscribe(&self, channel: &str) -> CustomResult { - self.subscriber - .subscribe(channel) - .await - .into_report() - .change_context(errors::RedisError::SubscribeError) - } - #[inline] - async fn publish(&self, channel: &str, key: &str) -> CustomResult { - self.publisher - .publish(channel, key) - .await - .into_report() - .change_context(errors::RedisError::SubscribeError) - } - #[inline] - async fn on_message(&self) -> CustomResult<(), errors::RedisError> { - let mut message = self.subscriber.on_message(); - while let Some((_, key)) = message.next().await { - let key = key - .as_string() - .ok_or::(errors::RedisError::DeleteFailed)?; - self.delete_key(&key).await?; - } - Ok(()) - } -} - struct RedisConfig { default_ttl: u32, default_stream_read_count: u64, diff --git a/crates/router/src/cache.rs b/crates/router/src/cache.rs index bc7580c0d8..27c3629524 100644 --- a/crates/router/src/cache.rs +++ b/crates/router/src/cache.rs @@ -1,7 +1,17 @@ -use std::any::Any; +use std::{any::Any, sync::Arc}; use dyn_clone::DynClone; use moka::future::Cache as MokaCache; +use once_cell::sync::Lazy; + +/// Time to live 30 mins +const CACHE_TTL: u64 = 30 * 60; + +/// Time to idle 10 mins +const CACHE_TTI: u64 = 10 * 60; + +/// Config Cache with time_to_live as 30 mins and time_to_idle as 10 mins. +pub static CONFIG_CACHE: Lazy = Lazy::new(|| Cache::new(CACHE_TTL, CACHE_TTI)); /// Trait which defines the behaviour of types that's gonna be stored in Cache pub trait Cacheable: Any + Send + Sync + DynClone { @@ -20,11 +30,11 @@ where dyn_clone::clone_trait_object!(Cacheable); pub struct Cache { - inner: MokaCache>, + inner: MokaCache>, } impl std::ops::Deref for Cache { - type Target = MokaCache>; + type Target = MokaCache>; fn deref(&self) -> &Self::Target { &self.inner } @@ -45,7 +55,11 @@ impl Cache { } } - pub fn get_val(&self, key: &str) -> Option { + pub async fn push(&self, key: String, val: T) { + self.insert(key, Arc::new(val)).await; + } + + pub fn get_val(&self, key: &str) -> Option { let val = self.get(key)?; (*val).as_any().downcast_ref::().cloned() } @@ -58,9 +72,7 @@ mod cache_tests { #[tokio::test] async fn construct_and_get_cache() { let cache = Cache::new(1800, 1800); - cache - .insert("key".to_string(), Box::new("val".to_string())) - .await; + cache.push("key".to_string(), "val".to_string()).await; assert_eq!(cache.get_val::("key"), Some(String::from("val"))); } } diff --git a/crates/router/src/db/cache.rs b/crates/router/src/db/cache.rs index 77b6fe5cba..f616e98e1f 100644 --- a/crates/router/src/db/cache.rs +++ b/crates/router/src/db/cache.rs @@ -1,9 +1,15 @@ +use common_utils::ext_traits::AsyncExt; use error_stack::ResultExt; use super::Store; -use crate::core::errors::{self, CustomResult}; +use crate::{ + cache::{self, Cacheable}, + consts, + core::errors::{self, CustomResult}, + services::PubSubInterface, +}; -pub async fn get_or_populate_cache( +pub async fn get_or_populate_redis( store: &Store, key: &str, fun: F, @@ -36,10 +42,52 @@ where } } +pub async fn get_or_populate_in_memory( + store: &Store, + key: &str, + fun: F, + cache: &cache::Cache, +) -> CustomResult +where + T: Cacheable + serde::Serialize + serde::de::DeserializeOwned + std::fmt::Debug + Clone, + F: FnOnce() -> Fut + Send, + Fut: futures::Future> + Send, +{ + let cache_val = cache.get_val::(key); + if let Some(val) = cache_val { + Ok(val) + } else { + let val = get_or_populate_redis(store, key, fun).await?; + cache.push(key.to_string(), val.clone()).await; + Ok(val) + } +} + pub async fn redact_cache( store: &Store, key: &str, fun: F, + in_memory: Option<&cache::Cache>, +) -> CustomResult +where + F: FnOnce() -> Fut + Send, + Fut: futures::Future> + Send, +{ + let data = fun().await?; + in_memory.async_map(|cache| cache.invalidate(key)).await; + store + .redis_conn() + .map_err(Into::::into)? + .delete_key(key) + .await + .change_context(errors::StorageError::KVError)?; + Ok(data) +} + +pub async fn publish_and_redact( + store: &Store, + key: &str, + fun: F, ) -> CustomResult where F: FnOnce() -> Fut + Send, @@ -49,7 +97,7 @@ where store .redis_conn() .map_err(Into::::into)? - .delete_key(key) + .publish(consts::PUB_SUB_CHANNEL, key) .await .change_context(errors::StorageError::KVError)?; Ok(data) diff --git a/crates/router/src/db/configs.rs b/crates/router/src/db/configs.rs index eae44ed0da..9566eb592c 100644 --- a/crates/router/src/db/configs.rs +++ b/crates/router/src/db/configs.rs @@ -2,8 +2,10 @@ use error_stack::IntoReport; use super::{cache, MockDb, Store}; use crate::{ - connection, + cache::CONFIG_CACHE, + connection, consts, core::errors::{self, CustomResult}, + services::PubSubInterface, types::storage, }; @@ -76,26 +78,31 @@ impl ConfigInterface for Store { key: &str, config_update: storage::ConfigUpdate, ) -> CustomResult { - cache::redact_cache(self, key, || async { - self.update_config_by_key(key, config_update).await - }) - .await + cache::publish_and_redact(self, key, || self.update_config_by_key(key, config_update)).await } async fn find_config_by_key_cached( &self, key: &str, ) -> CustomResult { - cache::get_or_populate_cache(self, key, || async { self.find_config_by_key(key).await }) + cache::get_or_populate_in_memory(self, key, || self.find_config_by_key(key), &CONFIG_CACHE) .await } async fn delete_config_by_key(&self, key: &str) -> CustomResult { let conn = connection::pg_connection_write(self).await?; - storage::Config::delete_by_key(&conn, key) + let deleted = storage::Config::delete_by_key(&conn, key) .await .map_err(Into::into) - .into_report() + .into_report()?; + + self.redis_conn() + .map_err(Into::::into)? + .publish(consts::PUB_SUB_CHANNEL, key) + .await + .map_err(Into::::into)?; + + Ok(deleted) } } diff --git a/crates/router/src/db/merchant_account.rs b/crates/router/src/db/merchant_account.rs index e6625ad764..97fe3f4541 100644 --- a/crates/router/src/db/merchant_account.rs +++ b/crates/router/src/db/merchant_account.rs @@ -75,7 +75,7 @@ impl MerchantAccountInterface for Store { #[cfg(feature = "accounts_cache")] { - super::cache::get_or_populate_cache(self, merchant_id, fetch_func).await + super::cache::get_or_populate_redis(self, merchant_id, fetch_func).await } } @@ -100,7 +100,7 @@ impl MerchantAccountInterface for Store { #[cfg(feature = "accounts_cache")] { - super::cache::redact_cache(self, &_merchant_id, update_func).await + super::cache::redact_cache(self, &_merchant_id, update_func, None).await } } @@ -128,7 +128,7 @@ impl MerchantAccountInterface for Store { #[cfg(feature = "accounts_cache")] { - super::cache::redact_cache(self, merchant_id, update_func).await + super::cache::redact_cache(self, merchant_id, update_func, None).await } } @@ -162,7 +162,7 @@ impl MerchantAccountInterface for Store { #[cfg(feature = "accounts_cache")] { - super::cache::redact_cache(self, merchant_id, delete_func).await + super::cache::redact_cache(self, merchant_id, delete_func, None).await } } } diff --git a/crates/router/src/db/merchant_connector_account.rs b/crates/router/src/db/merchant_connector_account.rs index 2154c056f9..f29944a01c 100644 --- a/crates/router/src/db/merchant_connector_account.rs +++ b/crates/router/src/db/merchant_connector_account.rs @@ -175,7 +175,7 @@ impl MerchantConnectorAccountInterface for Store { #[cfg(feature = "accounts_cache")] { - super::cache::get_or_populate_cache(self, merchant_connector_id, find_call).await + super::cache::get_or_populate_redis(self, merchant_connector_id, find_call).await } } @@ -215,7 +215,7 @@ impl MerchantConnectorAccountInterface for Store { #[cfg(feature = "accounts_cache")] { - super::cache::redact_cache(self, &_merchant_connector_id, update_call).await + super::cache::redact_cache(self, &_merchant_connector_id, update_call, None).await } #[cfg(not(feature = "accounts_cache"))] diff --git a/crates/router/src/db/reverse_lookup.rs b/crates/router/src/db/reverse_lookup.rs index fbccc77026..0daa81cf25 100644 --- a/crates/router/src/db/reverse_lookup.rs +++ b/crates/router/src/db/reverse_lookup.rs @@ -40,7 +40,7 @@ impl ReverseLookupInterface for Store { .map_err(Into::into) .into_report() }; - cache::get_or_populate_cache(self, id, database_call).await + cache::get_or_populate_redis(self, id, database_call).await } } diff --git a/crates/router/src/services.rs b/crates/router/src/services.rs index 4173beb59a..38217db629 100644 --- a/crates/router/src/services.rs +++ b/crates/router/src/services.rs @@ -7,16 +7,73 @@ pub mod logger; use std::sync::{atomic, Arc}; -use redis_interface::{errors::RedisError, PubSubInterface}; +use error_stack::{IntoReport, ResultExt}; +use futures::StreamExt; +use redis_interface::{errors as redis_errors, PubsubInterface}; pub use self::{api::*, encryption::*}; use crate::{ async_spawn, + cache::CONFIG_CACHE, connection::{diesel_make_pg_pool, PgPool}, consts, core::errors, }; +#[async_trait::async_trait] +pub trait PubSubInterface { + async fn subscribe( + &self, + channel: &str, + ) -> errors::CustomResult; + async fn publish( + &self, + channel: &str, + key: &str, + ) -> errors::CustomResult; + async fn on_message(&self) -> errors::CustomResult<(), redis_errors::RedisError>; +} + +#[async_trait::async_trait] +impl PubSubInterface for redis_interface::RedisConnectionPool { + #[inline] + async fn subscribe( + &self, + channel: &str, + ) -> errors::CustomResult { + self.subscriber + .subscribe(channel) + .await + .into_report() + .change_context(redis_errors::RedisError::SubscribeError) + } + #[inline] + async fn publish( + &self, + channel: &str, + key: &str, + ) -> errors::CustomResult { + self.publisher + .publish(channel, key) + .await + .into_report() + .change_context(redis_errors::RedisError::SubscribeError) + } + #[inline] + async fn on_message(&self) -> errors::CustomResult<(), redis_errors::RedisError> { + let mut message = self.subscriber.on_message(); + while let Some((_, key)) = message.next().await { + let key = key + .as_string() + .ok_or::(redis_errors::RedisError::DeleteFailed)?; + + self.delete_key(&key).await?; + CONFIG_CACHE.invalidate(&key).await; + } + Ok(()) + } +} + #[derive(Clone)] pub struct Store { pub master_pool: PgPool, @@ -74,7 +131,8 @@ impl Store { pub fn redis_conn( &self, - ) -> errors::CustomResult, RedisError> { + ) -> errors::CustomResult, redis_errors::RedisError> + { if self .redis_conn .is_redis_available @@ -82,7 +140,7 @@ impl Store { { Ok(self.redis_conn.clone()) } else { - Err(RedisError::RedisConnectionError.into()) + Err(redis_errors::RedisError::RedisConnectionError.into()) } } @@ -95,8 +153,6 @@ impl Store { where T: crate::utils::storage_partitioning::KvStorePartition, { - use error_stack::ResultExt; - let shard_key = T::shard_key(partition_key, self.config.drainer_num_partitions); let stream_name = self.get_drainer_stream_name(&shard_key); self.redis_conn