feat: add accounts in-memory cache (#1086)

This commit is contained in:
Kartikeya Hegde
2023-05-15 18:03:49 +05:30
committed by GitHub
parent f31926b833
commit da4d721424
8 changed files with 150 additions and 31 deletions

View File

@ -33,7 +33,7 @@ pub enum ParsingError {
/// Validation errors. /// Validation errors.
#[allow(missing_docs)] // Only to prevent warnings about struct fields not being documented #[allow(missing_docs)] // Only to prevent warnings about struct fields not being documented
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error, Clone)]
pub enum ValidationError { pub enum ValidationError {
/// The provided input is missing a required field. /// The provided input is missing a required field.
#[error("Missing required field: {field_name}")] #[error("Missing required field: {field_name}")]

View File

@ -58,4 +58,6 @@ pub enum RedisError {
SubscribeError, SubscribeError,
#[error("Failed to publish to a channel")] #[error("Failed to publish to a channel")]
PublishError, PublishError,
#[error("Failed while receiving message from publisher")]
OnMessageError,
} }

View File

@ -5,9 +5,36 @@
use common_utils::errors::CustomResult; use common_utils::errors::CustomResult;
use error_stack::IntoReport; use error_stack::IntoReport;
use fred::types::RedisValue as FredRedisValue;
use crate::errors; use crate::errors;
pub struct RedisValue {
inner: FredRedisValue,
}
impl std::ops::Deref for RedisValue {
type Target = FredRedisValue;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
impl RedisValue {
pub fn new(value: FredRedisValue) -> Self {
Self { inner: value }
}
pub fn into_inner(self) -> FredRedisValue {
self.inner
}
pub fn from_string(value: String) -> Self {
Self {
inner: FredRedisValue::String(value.into()),
}
}
}
#[derive(Debug, serde::Deserialize, Clone)] #[derive(Debug, serde::Deserialize, Clone)]
#[serde(default)] #[serde(default)]
pub struct RedisSettings { pub struct RedisSettings {

View File

@ -1,8 +1,18 @@
use std::{any::Any, sync::Arc}; use std::{any::Any, borrow::Cow, sync::Arc};
use dyn_clone::DynClone; use dyn_clone::DynClone;
use error_stack::Report;
use moka::future::Cache as MokaCache; use moka::future::Cache as MokaCache;
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use redis_interface::RedisValue;
use crate::core::errors;
/// Prefix for config cache key
const CONFIG_CACHE_PREFIX: &str = "config";
/// Prefix for accounts cache key
const ACCOUNTS_CACHE_PREFIX: &str = "accounts";
/// Time to live 30 mins /// Time to live 30 mins
const CACHE_TTL: u64 = 30 * 60; const CACHE_TTL: u64 = 30 * 60;
@ -10,14 +20,56 @@ const CACHE_TTL: u64 = 30 * 60;
/// Time to idle 10 mins /// Time to idle 10 mins
const CACHE_TTI: u64 = 10 * 60; const CACHE_TTI: u64 = 10 * 60;
/// Max Capacity of Cache in MB
const MAX_CAPACITY: u64 = 30;
/// Config Cache with time_to_live as 30 mins and time_to_idle as 10 mins. /// Config Cache with time_to_live as 30 mins and time_to_idle as 10 mins.
pub static CONFIG_CACHE: Lazy<Cache> = Lazy::new(|| Cache::new(CACHE_TTL, CACHE_TTI)); pub static CONFIG_CACHE: Lazy<Cache> = Lazy::new(|| Cache::new(CACHE_TTL, CACHE_TTI, None));
/// Accounts cache with time_to_live as 30 mins and size limit
pub static ACCOUNTS_CACHE: Lazy<Cache> =
Lazy::new(|| Cache::new(CACHE_TTL, CACHE_TTI, Some(MAX_CAPACITY)));
/// Trait which defines the behaviour of types that's gonna be stored in Cache /// Trait which defines the behaviour of types that's gonna be stored in Cache
pub trait Cacheable: Any + Send + Sync + DynClone { pub trait Cacheable: Any + Send + Sync + DynClone {
fn as_any(&self) -> &dyn Any; fn as_any(&self) -> &dyn Any;
} }
pub enum CacheKind<'a> {
Config(Cow<'a, str>),
Accounts(Cow<'a, str>),
}
impl<'a> From<CacheKind<'a>> for RedisValue {
fn from(kind: CacheKind<'a>) -> Self {
let value = match kind {
CacheKind::Config(s) => format!("{CONFIG_CACHE_PREFIX},{s}"),
CacheKind::Accounts(s) => format!("{ACCOUNTS_CACHE_PREFIX},{s}"),
};
Self::from_string(value)
}
}
impl<'a> TryFrom<RedisValue> for CacheKind<'a> {
type Error = Report<errors::ValidationError>;
fn try_from(kind: RedisValue) -> Result<Self, Self::Error> {
let validation_err = errors::ValidationError::InvalidValue {
message: "Invalid publish key provided in pubsub".into(),
};
let kind = kind.as_string().ok_or(validation_err.clone())?;
let mut split = kind.split(',');
match split.next().ok_or(validation_err.clone())? {
ACCOUNTS_CACHE_PREFIX => Ok(Self::Accounts(Cow::Owned(
split.next().ok_or(validation_err)?.to_string(),
))),
CONFIG_CACHE_PREFIX => Ok(Self::Config(Cow::Owned(
split.next().ok_or(validation_err)?.to_string(),
))),
_ => Err(validation_err.into()),
}
}
}
impl<T> Cacheable for T impl<T> Cacheable for T
where where
T: Any + Clone + Send + Sync, T: Any + Clone + Send + Sync,
@ -45,13 +97,19 @@ impl Cache {
/// ///
/// `time_to_live`: Time in seconds before an object is stored in a caching system before its deleted /// `time_to_live`: Time in seconds before an object is stored in a caching system before its deleted
/// `time_to_idle`: Time in seconds before a `get` or `insert` operation an object is stored in a caching system before it's deleted /// `time_to_idle`: Time in seconds before a `get` or `insert` operation an object is stored in a caching system before it's deleted
pub fn new(time_to_live: u64, time_to_idle: u64) -> Self { /// `max_capacity`: Max size in MB's that the cache can hold
pub fn new(time_to_live: u64, time_to_idle: u64, max_capacity: Option<u64>) -> Self {
let mut cache_builder = MokaCache::builder()
.eviction_listener_with_queued_delivery_mode(|_, _, _| {})
.time_to_live(std::time::Duration::from_secs(time_to_live))
.time_to_idle(std::time::Duration::from_secs(time_to_idle));
if let Some(capacity) = max_capacity {
cache_builder = cache_builder.max_capacity(capacity * 1024 * 1024);
}
Self { Self {
inner: MokaCache::builder() inner: cache_builder.build(),
.eviction_listener_with_queued_delivery_mode(|_, _, _| {})
.time_to_live(std::time::Duration::from_secs(time_to_live))
.time_to_idle(std::time::Duration::from_secs(time_to_idle))
.build(),
} }
} }
@ -71,8 +129,23 @@ mod cache_tests {
#[tokio::test] #[tokio::test]
async fn construct_and_get_cache() { async fn construct_and_get_cache() {
let cache = Cache::new(1800, 1800); let cache = Cache::new(1800, 1800, None);
cache.push("key".to_string(), "val".to_string()).await; cache.push("key".to_string(), "val".to_string()).await;
assert_eq!(cache.get_val::<String>("key"), Some(String::from("val"))); assert_eq!(cache.get_val::<String>("key"), Some(String::from("val")));
} }
#[tokio::test]
async fn eviction_on_time_test() {
let cache = Cache::new(2, 2, None);
cache.push("key".to_string(), "val".to_string()).await;
tokio::time::sleep(std::time::Duration::from_secs(3)).await;
assert_eq!(cache.get_val::<String>("key"), None);
}
#[tokio::test]
async fn eviction_on_size_test() {
let cache = Cache::new(2, 2, Some(0));
cache.push("key".to_string(), "val".to_string()).await;
assert_eq!(cache.get_val::<String>("key"), None);
}
} }

View File

@ -87,9 +87,9 @@ where
Ok(data) Ok(data)
} }
pub async fn publish_and_redact<T, F, Fut>( pub async fn publish_and_redact<'a, T, F, Fut>(
store: &Store, store: &Store,
key: &str, key: cache::CacheKind<'a>,
fun: F, fun: F,
) -> CustomResult<T, errors::StorageError> ) -> CustomResult<T, errors::StorageError>
where where

View File

@ -2,7 +2,7 @@ use error_stack::IntoReport;
use super::{cache, MockDb, Store}; use super::{cache, MockDb, Store};
use crate::{ use crate::{
cache::CONFIG_CACHE, cache::{CacheKind, CONFIG_CACHE},
connection, consts, connection, consts,
core::errors::{self, CustomResult}, core::errors::{self, CustomResult},
services::PubSubInterface, services::PubSubInterface,
@ -78,7 +78,10 @@ impl ConfigInterface for Store {
key: &str, key: &str,
config_update: storage::ConfigUpdate, config_update: storage::ConfigUpdate,
) -> CustomResult<storage::Config, errors::StorageError> { ) -> CustomResult<storage::Config, errors::StorageError> {
cache::publish_and_redact(self, key, || self.update_config_by_key(key, config_update)).await cache::publish_and_redact(self, CacheKind::Config(key.into()), || {
self.update_config_by_key(key, config_update)
})
.await
} }
async fn find_config_by_key_cached( async fn find_config_by_key_cached(
@ -98,7 +101,7 @@ impl ConfigInterface for Store {
self.redis_conn() self.redis_conn()
.map_err(Into::<errors::StorageError>::into)? .map_err(Into::<errors::StorageError>::into)?
.publish(consts::PUB_SUB_CHANNEL, key) .publish(consts::PUB_SUB_CHANNEL, CacheKind::Config(key.into()))
.await .await
.map_err(Into::<errors::StorageError>::into)?; .map_err(Into::<errors::StorageError>::into)?;

View File

@ -1,6 +1,8 @@
use error_stack::IntoReport; use error_stack::IntoReport;
use super::{MockDb, Store}; use super::{MockDb, Store};
#[cfg(feature = "accounts_cache")]
use crate::cache::{self, ACCOUNTS_CACHE};
use crate::{ use crate::{
connection, connection,
core::errors::{self, CustomResult}, core::errors::{self, CustomResult},
@ -75,7 +77,8 @@ impl MerchantAccountInterface for Store {
#[cfg(feature = "accounts_cache")] #[cfg(feature = "accounts_cache")]
{ {
super::cache::get_or_populate_redis(self, merchant_id, fetch_func).await super::cache::get_or_populate_in_memory(self, merchant_id, fetch_func, &ACCOUNTS_CACHE)
.await
} }
} }
@ -100,7 +103,12 @@ impl MerchantAccountInterface for Store {
#[cfg(feature = "accounts_cache")] #[cfg(feature = "accounts_cache")]
{ {
super::cache::redact_cache(self, &_merchant_id, update_func, None).await super::cache::publish_and_redact(
self,
cache::CacheKind::Accounts(_merchant_id.into()),
update_func,
)
.await
} }
} }

View File

@ -6,13 +6,13 @@ pub mod logger;
use std::sync::{atomic, Arc}; use std::sync::{atomic, Arc};
use error_stack::{IntoReport, ResultExt}; use error_stack::{IntoReport, ResultExt};
use redis_interface::{errors as redis_errors, PubsubInterface}; use redis_interface::{errors as redis_errors, PubsubInterface, RedisValue};
use tokio::sync::oneshot; use tokio::sync::oneshot;
pub use self::{api::*, encryption::*}; pub use self::{api::*, encryption::*};
use crate::{ use crate::{
async_spawn, async_spawn,
cache::CONFIG_CACHE, cache::{CacheKind, ACCOUNTS_CACHE, CONFIG_CACHE},
configs::settings, configs::settings,
connection::{diesel_make_pg_pool, PgPool}, connection::{diesel_make_pg_pool, PgPool},
consts, consts,
@ -26,10 +26,10 @@ pub trait PubSubInterface {
channel: &str, channel: &str,
) -> errors::CustomResult<usize, redis_errors::RedisError>; ) -> errors::CustomResult<usize, redis_errors::RedisError>;
async fn publish( async fn publish<'a>(
&self, &self,
channel: &str, channel: &str,
key: &str, key: CacheKind<'a>,
) -> errors::CustomResult<usize, redis_errors::RedisError>; ) -> errors::CustomResult<usize, redis_errors::RedisError>;
async fn on_message(&self) -> errors::CustomResult<(), redis_errors::RedisError>; async fn on_message(&self) -> errors::CustomResult<(), redis_errors::RedisError>;
@ -50,13 +50,13 @@ impl PubSubInterface for redis_interface::RedisConnectionPool {
} }
#[inline] #[inline]
async fn publish( async fn publish<'a>(
&self, &self,
channel: &str, channel: &str,
key: &str, key: CacheKind<'a>,
) -> errors::CustomResult<usize, redis_errors::RedisError> { ) -> errors::CustomResult<usize, redis_errors::RedisError> {
self.publisher self.publisher
.publish(channel, key) .publish(channel, RedisValue::from(key).into_inner())
.await .await
.into_report() .into_report()
.change_context(redis_errors::RedisError::SubscribeError) .change_context(redis_errors::RedisError::SubscribeError)
@ -66,13 +66,19 @@ impl PubSubInterface for redis_interface::RedisConnectionPool {
async fn on_message(&self) -> errors::CustomResult<(), redis_errors::RedisError> { async fn on_message(&self) -> errors::CustomResult<(), redis_errors::RedisError> {
let mut rx = self.subscriber.on_message(); let mut rx = self.subscriber.on_message();
while let Ok(message) = rx.recv().await { while let Ok(message) = rx.recv().await {
let key = message let key: CacheKind<'_> = RedisValue::new(message.value)
.value .try_into()
.as_string() .change_context(redis_errors::RedisError::OnMessageError)?;
.ok_or::<redis_errors::RedisError>(redis_errors::RedisError::DeleteFailed)?; match key {
CacheKind::Config(key) => {
self.delete_key(&key).await?; self.delete_key(key.as_ref()).await?;
CONFIG_CACHE.invalidate(&key).await; CONFIG_CACHE.invalidate(key.as_ref()).await;
}
CacheKind::Accounts(key) => {
self.delete_key(key.as_ref()).await?;
ACCOUNTS_CACHE.invalidate(key.as_ref()).await;
}
}
} }
Ok(()) Ok(())
} }