fix: invalidation of in-memory cache (#1270)

This commit is contained in:
Kartikeya Hegde
2023-05-25 17:29:31 +05:30
committed by GitHub
parent f0db9937c7
commit e78b3a65d4
3 changed files with 43 additions and 17 deletions

View File

@ -57,14 +57,10 @@ impl<'a> TryFrom<RedisValue> for CacheKind<'a> {
message: "Invalid publish key provided in pubsub".into(), message: "Invalid publish key provided in pubsub".into(),
}; };
let kind = kind.as_string().ok_or(validation_err.clone())?; let kind = kind.as_string().ok_or(validation_err.clone())?;
let mut split = kind.split(','); let split = kind.split_once(',').ok_or(validation_err.clone())?;
match split.next().ok_or(validation_err.clone())? { match split.0 {
ACCOUNTS_CACHE_PREFIX => Ok(Self::Accounts(Cow::Owned( ACCOUNTS_CACHE_PREFIX => Ok(Self::Accounts(Cow::Owned(split.1.to_string()))),
split.next().ok_or(validation_err)?.to_string(), CONFIG_CACHE_PREFIX => Ok(Self::Config(Cow::Owned(split.1.to_string()))),
))),
CONFIG_CACHE_PREFIX => Ok(Self::Config(Cow::Owned(
split.next().ok_or(validation_err)?.to_string(),
))),
_ => Err(validation_err.into()), _ => Err(validation_err.into()),
} }
} }

View File

@ -136,7 +136,12 @@ impl MerchantAccountInterface for Store {
#[cfg(feature = "accounts_cache")] #[cfg(feature = "accounts_cache")]
{ {
super::cache::redact_cache(self, merchant_id, update_func, None).await super::cache::publish_and_redact(
self,
cache::CacheKind::Accounts(merchant_id.into()),
update_func,
)
.await
} }
} }
@ -170,7 +175,12 @@ impl MerchantAccountInterface for Store {
#[cfg(feature = "accounts_cache")] #[cfg(feature = "accounts_cache")]
{ {
super::cache::redact_cache(self, merchant_id, delete_func, None).await super::cache::publish_and_redact(
self,
cache::CacheKind::Accounts(merchant_id.into()),
delete_func,
)
.await
} }
} }
} }

View File

@ -64,21 +64,38 @@ impl PubSubInterface for redis_interface::RedisConnectionPool {
#[inline] #[inline]
async fn on_message(&self) -> errors::CustomResult<(), redis_errors::RedisError> { async fn on_message(&self) -> errors::CustomResult<(), redis_errors::RedisError> {
logger::debug!("Started on message");
let mut rx = self.subscriber.on_message(); let mut rx = self.subscriber.on_message();
while let Ok(message) = rx.recv().await { while let Ok(message) = rx.recv().await {
let key: CacheKind<'_> = RedisValue::new(message.value) logger::debug!("Invalidating {message:?}");
let key: CacheKind<'_> = match RedisValue::new(message.value)
.try_into() .try_into()
.change_context(redis_errors::RedisError::OnMessageError)?; .change_context(redis_errors::RedisError::OnMessageError)
match key { {
Ok(value) => value,
Err(err) => {
logger::error!(value_conversion_err=?err);
continue;
}
};
let key = match key {
CacheKind::Config(key) => { CacheKind::Config(key) => {
self.delete_key(key.as_ref()).await?;
CONFIG_CACHE.invalidate(key.as_ref()).await; CONFIG_CACHE.invalidate(key.as_ref()).await;
key
} }
CacheKind::Accounts(key) => { CacheKind::Accounts(key) => {
self.delete_key(key.as_ref()).await?;
ACCOUNTS_CACHE.invalidate(key.as_ref()).await; ACCOUNTS_CACHE.invalidate(key.as_ref()).await;
key
} }
} };
self.delete_key(key.as_ref())
.await
.map_err(|err| logger::error!("Error while deleting redis key: {err:?}"))
.ok();
logger::debug!("Done invalidating {key}");
} }
Ok(()) Ok(())
} }
@ -116,7 +133,10 @@ impl Store {
let subscriber_conn = redis_conn.clone(); let subscriber_conn = redis_conn.clone();
redis_conn.subscribe(consts::PUB_SUB_CHANNEL).await.ok(); if let Err(e) = redis_conn.subscribe(consts::PUB_SUB_CHANNEL).await {
logger::error!(subscribe_err=?e);
}
async_spawn!({ async_spawn!({
if let Err(e) = subscriber_conn.on_message().await { if let Err(e) = subscriber_conn.on_message().await {
logger::error!(pubsub_err=?e); logger::error!(pubsub_err=?e);