mirror of
https://github.com/juspay/hyperswitch.git
synced 2025-10-29 09:07:09 +08:00
refactor(redis): spawn one subscriber thread for handling all the published messages to different channel (#5064)
This commit is contained in:
@ -68,6 +68,7 @@ impl RedisClient {
|
|||||||
|
|
||||||
pub struct SubscriberClient {
|
pub struct SubscriberClient {
|
||||||
inner: fred::clients::SubscriberClient,
|
inner: fred::clients::SubscriberClient,
|
||||||
|
pub is_subscriber_handler_spawned: Arc<atomic::AtomicBool>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl SubscriberClient {
|
impl SubscriberClient {
|
||||||
@ -83,7 +84,10 @@ impl SubscriberClient {
|
|||||||
.wait_for_connect()
|
.wait_for_connect()
|
||||||
.await
|
.await
|
||||||
.change_context(errors::RedisError::RedisConnectionError)?;
|
.change_context(errors::RedisError::RedisConnectionError)?;
|
||||||
Ok(Self { inner: client })
|
Ok(Self {
|
||||||
|
inner: client,
|
||||||
|
is_subscriber_handler_spawned: Arc::new(atomic::AtomicBool::new(false)),
|
||||||
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -72,7 +72,7 @@ impl ConfigInterface for Store {
|
|||||||
self.get_redis_conn()
|
self.get_redis_conn()
|
||||||
.map_err(Into::<errors::StorageError>::into)?
|
.map_err(Into::<errors::StorageError>::into)?
|
||||||
.publish(
|
.publish(
|
||||||
cache::PUB_SUB_CHANNEL,
|
cache::IMC_INVALIDATION_CHANNEL,
|
||||||
CacheKind::Config((&inserted.key).into()),
|
CacheKind::Config((&inserted.key).into()),
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
@ -179,7 +179,10 @@ impl ConfigInterface for Store {
|
|||||||
|
|
||||||
self.get_redis_conn()
|
self.get_redis_conn()
|
||||||
.map_err(Into::<errors::StorageError>::into)?
|
.map_err(Into::<errors::StorageError>::into)?
|
||||||
.publish(cache::PUB_SUB_CHANNEL, CacheKind::Config(key.into()))
|
.publish(
|
||||||
|
cache::IMC_INVALIDATION_CHANNEL,
|
||||||
|
CacheKind::Config(key.into()),
|
||||||
|
)
|
||||||
.await
|
.await
|
||||||
.map_err(Into::<errors::StorageError>::into)?;
|
.map_err(Into::<errors::StorageError>::into)?;
|
||||||
|
|
||||||
|
|||||||
@ -74,7 +74,7 @@ pub async fn get_store(
|
|||||||
tenant,
|
tenant,
|
||||||
master_enc_key,
|
master_enc_key,
|
||||||
cache_store,
|
cache_store,
|
||||||
storage_impl::redis::cache::PUB_SUB_CHANNEL,
|
storage_impl::redis::cache::IMC_INVALIDATION_CHANNEL,
|
||||||
)
|
)
|
||||||
.await?
|
.await?
|
||||||
};
|
};
|
||||||
|
|||||||
@ -21,7 +21,7 @@ use crate::{
|
|||||||
};
|
};
|
||||||
|
|
||||||
/// Redis channel name used for publishing invalidation messages
|
/// Redis channel name used for publishing invalidation messages
|
||||||
pub const PUB_SUB_CHANNEL: &str = "hyperswitch_invalidate";
|
pub const IMC_INVALIDATION_CHANNEL: &str = "hyperswitch_invalidate";
|
||||||
|
|
||||||
/// Prefix for config cache key
|
/// Prefix for config cache key
|
||||||
const CONFIG_CACHE_PREFIX: &str = "config";
|
const CONFIG_CACHE_PREFIX: &str = "config";
|
||||||
@ -392,7 +392,7 @@ pub async fn publish_into_redact_channel<'a, K: IntoIterator<Item = CacheKind<'a
|
|||||||
let futures = keys.into_iter().map(|key| async {
|
let futures = keys.into_iter().map(|key| async {
|
||||||
redis_conn
|
redis_conn
|
||||||
.clone()
|
.clone()
|
||||||
.publish(PUB_SUB_CHANNEL, key)
|
.publish(IMC_INVALIDATION_CHANNEL, key)
|
||||||
.await
|
.await
|
||||||
.change_context(StorageError::KVError)
|
.change_context(StorageError::KVError)
|
||||||
});
|
});
|
||||||
|
|||||||
@ -1,3 +1,5 @@
|
|||||||
|
use std::sync::atomic;
|
||||||
|
|
||||||
use error_stack::ResultExt;
|
use error_stack::ResultExt;
|
||||||
use redis_interface::{errors as redis_errors, PubsubInterface, RedisValue};
|
use redis_interface::{errors as redis_errors, PubsubInterface, RedisValue};
|
||||||
use router_env::{logger, tracing::Instrument};
|
use router_env::{logger, tracing::Instrument};
|
||||||
@ -32,15 +34,29 @@ impl PubSubInterface for std::sync::Arc<redis_interface::RedisConnectionPool> {
|
|||||||
.await
|
.await
|
||||||
.change_context(redis_errors::RedisError::SubscribeError)?;
|
.change_context(redis_errors::RedisError::SubscribeError)?;
|
||||||
|
|
||||||
let redis_clone = self.clone();
|
// Spawn only one thread handling all the published messages to different channels
|
||||||
let _task_handle = tokio::spawn(
|
if self
|
||||||
async move {
|
.subscriber
|
||||||
if let Err(pubsub_error) = redis_clone.on_message().await {
|
.is_subscriber_handler_spawned
|
||||||
logger::error!(?pubsub_error);
|
.compare_exchange(
|
||||||
|
false,
|
||||||
|
true,
|
||||||
|
atomic::Ordering::SeqCst,
|
||||||
|
atomic::Ordering::SeqCst,
|
||||||
|
)
|
||||||
|
.is_ok()
|
||||||
|
{
|
||||||
|
let redis_clone = self.clone();
|
||||||
|
let _task_handle = tokio::spawn(
|
||||||
|
async move {
|
||||||
|
if let Err(pubsub_error) = redis_clone.on_message().await {
|
||||||
|
logger::error!(?pubsub_error);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
.in_current_span(),
|
||||||
.in_current_span(),
|
);
|
||||||
);
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -61,120 +77,131 @@ impl PubSubInterface for std::sync::Arc<redis_interface::RedisConnectionPool> {
|
|||||||
logger::debug!("Started on message: {:?}", self.key_prefix);
|
logger::debug!("Started on message: {:?}", self.key_prefix);
|
||||||
let mut rx = self.subscriber.on_message();
|
let mut rx = self.subscriber.on_message();
|
||||||
while let Ok(message) = rx.recv().await {
|
while let Ok(message) = rx.recv().await {
|
||||||
logger::debug!("Invalidating {message:?}");
|
let channel_name = message.channel.to_string();
|
||||||
let key = match CacheKind::try_from(RedisValue::new(message.value))
|
logger::debug!("Received message on channel: {channel_name}");
|
||||||
.change_context(redis_errors::RedisError::OnMessageError)
|
|
||||||
{
|
|
||||||
Ok(value) => value,
|
|
||||||
Err(err) => {
|
|
||||||
logger::error!(value_conversion_err=?err);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let key = match key {
|
match channel_name.as_str() {
|
||||||
CacheKind::Config(key) => {
|
super::cache::IMC_INVALIDATION_CHANNEL => {
|
||||||
CONFIG_CACHE
|
let key = match CacheKind::try_from(RedisValue::new(message.value))
|
||||||
.remove(CacheKey {
|
.change_context(redis_errors::RedisError::OnMessageError)
|
||||||
key: key.to_string(),
|
{
|
||||||
prefix: self.key_prefix.clone(),
|
Ok(value) => value,
|
||||||
})
|
Err(err) => {
|
||||||
.await;
|
logger::error!(value_conversion_err=?err);
|
||||||
key
|
continue;
|
||||||
}
|
}
|
||||||
CacheKind::Accounts(key) => {
|
};
|
||||||
ACCOUNTS_CACHE
|
|
||||||
.remove(CacheKey {
|
|
||||||
key: key.to_string(),
|
|
||||||
prefix: self.key_prefix.clone(),
|
|
||||||
})
|
|
||||||
.await;
|
|
||||||
key
|
|
||||||
}
|
|
||||||
CacheKind::CGraph(key) => {
|
|
||||||
CGRAPH_CACHE
|
|
||||||
.remove(CacheKey {
|
|
||||||
key: key.to_string(),
|
|
||||||
prefix: self.key_prefix.clone(),
|
|
||||||
})
|
|
||||||
.await;
|
|
||||||
key
|
|
||||||
}
|
|
||||||
CacheKind::Routing(key) => {
|
|
||||||
ROUTING_CACHE
|
|
||||||
.remove(CacheKey {
|
|
||||||
key: key.to_string(),
|
|
||||||
prefix: self.key_prefix.clone(),
|
|
||||||
})
|
|
||||||
.await;
|
|
||||||
key
|
|
||||||
}
|
|
||||||
CacheKind::DecisionManager(key) => {
|
|
||||||
DECISION_MANAGER_CACHE
|
|
||||||
.remove(CacheKey {
|
|
||||||
key: key.to_string(),
|
|
||||||
prefix: self.key_prefix.clone(),
|
|
||||||
})
|
|
||||||
.await;
|
|
||||||
key
|
|
||||||
}
|
|
||||||
CacheKind::Surcharge(key) => {
|
|
||||||
SURCHARGE_CACHE
|
|
||||||
.remove(CacheKey {
|
|
||||||
key: key.to_string(),
|
|
||||||
prefix: self.key_prefix.clone(),
|
|
||||||
})
|
|
||||||
.await;
|
|
||||||
key
|
|
||||||
}
|
|
||||||
CacheKind::All(key) => {
|
|
||||||
CONFIG_CACHE
|
|
||||||
.remove(CacheKey {
|
|
||||||
key: key.to_string(),
|
|
||||||
prefix: self.key_prefix.clone(),
|
|
||||||
})
|
|
||||||
.await;
|
|
||||||
ACCOUNTS_CACHE
|
|
||||||
.remove(CacheKey {
|
|
||||||
key: key.to_string(),
|
|
||||||
prefix: self.key_prefix.clone(),
|
|
||||||
})
|
|
||||||
.await;
|
|
||||||
CGRAPH_CACHE
|
|
||||||
.remove(CacheKey {
|
|
||||||
key: key.to_string(),
|
|
||||||
prefix: self.key_prefix.clone(),
|
|
||||||
})
|
|
||||||
.await;
|
|
||||||
ROUTING_CACHE
|
|
||||||
.remove(CacheKey {
|
|
||||||
key: key.to_string(),
|
|
||||||
prefix: self.key_prefix.clone(),
|
|
||||||
})
|
|
||||||
.await;
|
|
||||||
DECISION_MANAGER_CACHE
|
|
||||||
.remove(CacheKey {
|
|
||||||
key: key.to_string(),
|
|
||||||
prefix: self.key_prefix.clone(),
|
|
||||||
})
|
|
||||||
.await;
|
|
||||||
SURCHARGE_CACHE
|
|
||||||
.remove(CacheKey {
|
|
||||||
key: key.to_string(),
|
|
||||||
prefix: self.key_prefix.clone(),
|
|
||||||
})
|
|
||||||
.await;
|
|
||||||
|
|
||||||
key
|
let key = match key {
|
||||||
|
CacheKind::Config(key) => {
|
||||||
|
CONFIG_CACHE
|
||||||
|
.remove(CacheKey {
|
||||||
|
key: key.to_string(),
|
||||||
|
prefix: self.key_prefix.clone(),
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
key
|
||||||
|
}
|
||||||
|
CacheKind::Accounts(key) => {
|
||||||
|
ACCOUNTS_CACHE
|
||||||
|
.remove(CacheKey {
|
||||||
|
key: key.to_string(),
|
||||||
|
prefix: self.key_prefix.clone(),
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
key
|
||||||
|
}
|
||||||
|
CacheKind::CGraph(key) => {
|
||||||
|
CGRAPH_CACHE
|
||||||
|
.remove(CacheKey {
|
||||||
|
key: key.to_string(),
|
||||||
|
prefix: self.key_prefix.clone(),
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
key
|
||||||
|
}
|
||||||
|
CacheKind::Routing(key) => {
|
||||||
|
ROUTING_CACHE
|
||||||
|
.remove(CacheKey {
|
||||||
|
key: key.to_string(),
|
||||||
|
prefix: self.key_prefix.clone(),
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
key
|
||||||
|
}
|
||||||
|
CacheKind::DecisionManager(key) => {
|
||||||
|
DECISION_MANAGER_CACHE
|
||||||
|
.remove(CacheKey {
|
||||||
|
key: key.to_string(),
|
||||||
|
prefix: self.key_prefix.clone(),
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
key
|
||||||
|
}
|
||||||
|
CacheKind::Surcharge(key) => {
|
||||||
|
SURCHARGE_CACHE
|
||||||
|
.remove(CacheKey {
|
||||||
|
key: key.to_string(),
|
||||||
|
prefix: self.key_prefix.clone(),
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
key
|
||||||
|
}
|
||||||
|
CacheKind::All(key) => {
|
||||||
|
CONFIG_CACHE
|
||||||
|
.remove(CacheKey {
|
||||||
|
key: key.to_string(),
|
||||||
|
prefix: self.key_prefix.clone(),
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
ACCOUNTS_CACHE
|
||||||
|
.remove(CacheKey {
|
||||||
|
key: key.to_string(),
|
||||||
|
prefix: self.key_prefix.clone(),
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
CGRAPH_CACHE
|
||||||
|
.remove(CacheKey {
|
||||||
|
key: key.to_string(),
|
||||||
|
prefix: self.key_prefix.clone(),
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
ROUTING_CACHE
|
||||||
|
.remove(CacheKey {
|
||||||
|
key: key.to_string(),
|
||||||
|
prefix: self.key_prefix.clone(),
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
DECISION_MANAGER_CACHE
|
||||||
|
.remove(CacheKey {
|
||||||
|
key: key.to_string(),
|
||||||
|
prefix: self.key_prefix.clone(),
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
SURCHARGE_CACHE
|
||||||
|
.remove(CacheKey {
|
||||||
|
key: key.to_string(),
|
||||||
|
prefix: self.key_prefix.clone(),
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
|
||||||
|
key
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
self.delete_key(key.as_ref())
|
||||||
|
.await
|
||||||
|
.map_err(|err| logger::error!("Error while deleting redis key: {err:?}"))
|
||||||
|
.ok();
|
||||||
|
|
||||||
|
logger::debug!(
|
||||||
|
"Handled message on channel {channel_name} - Done invalidating {key}"
|
||||||
|
);
|
||||||
}
|
}
|
||||||
};
|
_ => {
|
||||||
|
logger::debug!("Received message from unknown channel: {channel_name}");
|
||||||
self.delete_key(key.as_ref())
|
}
|
||||||
.await
|
}
|
||||||
.map_err(|err| logger::error!("Error while deleting redis key: {err:?}"))
|
|
||||||
.ok();
|
|
||||||
|
|
||||||
logger::debug!("Done invalidating {key}");
|
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user