feat: Add support for a redis pubsub interface (#614)

Co-authored-by: Sanchith Hegde <22217505+SanchithHegde@users.noreply.github.com>
This commit is contained in:
Kartikeya Hegde
2023-02-23 18:04:19 +05:30
committed by GitHub
parent 8ee097ea21
commit aaf372505c
7 changed files with 103 additions and 3 deletions

1
Cargo.lock generated
View File

@ -2936,6 +2936,7 @@ dependencies = [
name = "redis_interface" name = "redis_interface"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"async-trait",
"common_utils", "common_utils",
"error-stack", "error-stack",
"fred", "fred",

View File

@ -8,6 +8,7 @@ readme = "README.md"
license = "Apache-2.0" license = "Apache-2.0"
[dependencies] [dependencies]
async-trait = "0.1.63"
error-stack = "0.2.4" error-stack = "0.2.4"
fred = { version = "5.2.0", features = ["metrics", "partial-tracing"] } fred = { version = "5.2.0", features = ["metrics", "partial-tracing"] }
futures = "0.3" futures = "0.3"

View File

@ -54,4 +54,8 @@ pub enum RedisError {
InvalidRedisEntryId, InvalidRedisEntryId,
#[error("Failed to establish Redis connection")] #[error("Failed to establish Redis connection")]
RedisConnectionError, RedisConnectionError,
#[error("Failed to subscribe to a channel")]
SubscribeError,
#[error("Failed to publish to a channel")]
PublishError,
} }

View File

@ -25,7 +25,7 @@ use std::sync::{atomic, Arc};
use common_utils::errors::CustomResult; use common_utils::errors::CustomResult;
use error_stack::{IntoReport, ResultExt}; use error_stack::{IntoReport, ResultExt};
use fred::interfaces::ClientLike; use fred::interfaces::{ClientLike, PubsubInterface};
use futures::StreamExt; use futures::StreamExt;
use router_env::logger; use router_env::logger;
@ -35,9 +35,38 @@ pub struct RedisConnectionPool {
pub pool: fred::pool::RedisPool, pub pool: fred::pool::RedisPool,
config: RedisConfig, config: RedisConfig,
join_handles: Vec<fred::types::ConnectHandle>, join_handles: Vec<fred::types::ConnectHandle>,
subscriber: RedisClient,
publisher: RedisClient,
pub is_redis_available: Arc<atomic::AtomicBool>, pub is_redis_available: Arc<atomic::AtomicBool>,
} }
pub struct RedisClient {
inner: fred::prelude::RedisClient,
}
impl std::ops::Deref for RedisClient {
type Target = fred::prelude::RedisClient;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
impl RedisClient {
pub async fn new(
config: fred::types::RedisConfig,
policy: fred::types::ReconnectPolicy,
) -> CustomResult<Self, errors::RedisError> {
let client = fred::prelude::RedisClient::new(config);
client.connect(Some(policy));
client
.wait_for_connect()
.await
.into_report()
.change_context(errors::RedisError::RedisConnectionError)?;
Ok(Self { inner: client })
}
}
impl RedisConnectionPool { impl RedisConnectionPool {
/// Create a new Redis connection /// Create a new Redis connection
pub async fn new(conf: &RedisSettings) -> CustomResult<Self, errors::RedisError> { pub async fn new(conf: &RedisSettings) -> CustomResult<Self, errors::RedisError> {
@ -72,6 +101,11 @@ impl RedisConnectionPool {
conf.reconnect_max_attempts, conf.reconnect_max_attempts,
conf.reconnect_delay, conf.reconnect_delay,
); );
let subscriber = RedisClient::new(config.clone(), policy.clone()).await?;
let publisher = RedisClient::new(config.clone(), policy.clone()).await?;
let pool = fred::pool::RedisPool::new(config, conf.pool_size) let pool = fred::pool::RedisPool::new(config, conf.pool_size)
.into_report() .into_report()
.change_context(errors::RedisError::RedisConnectionError)?; .change_context(errors::RedisError::RedisConnectionError)?;
@ -89,6 +123,8 @@ impl RedisConnectionPool {
config, config,
join_handles, join_handles,
is_redis_available: Arc::new(atomic::AtomicBool::new(true)), is_redis_available: Arc::new(atomic::AtomicBool::new(true)),
subscriber,
publisher,
}) })
} }
@ -117,6 +153,44 @@ impl RedisConnectionPool {
} }
} }
#[async_trait::async_trait]
pub trait PubSubInterface {
async fn subscribe(&self, channel: &str) -> CustomResult<usize, errors::RedisError>;
async fn publish(&self, channel: &str, key: &str) -> CustomResult<usize, errors::RedisError>;
async fn on_message(&self) -> CustomResult<(), errors::RedisError>;
}
#[async_trait::async_trait]
impl PubSubInterface for RedisConnectionPool {
#[inline]
async fn subscribe(&self, channel: &str) -> CustomResult<usize, errors::RedisError> {
self.subscriber
.subscribe(channel)
.await
.into_report()
.change_context(errors::RedisError::SubscribeError)
}
#[inline]
async fn publish(&self, channel: &str, key: &str) -> CustomResult<usize, errors::RedisError> {
self.publisher
.publish(channel, key)
.await
.into_report()
.change_context(errors::RedisError::SubscribeError)
}
#[inline]
async fn on_message(&self) -> CustomResult<(), errors::RedisError> {
let mut message = self.subscriber.on_message();
while let Some((_, key)) = message.next().await {
let key = key
.as_string()
.ok_or::<errors::RedisError>(errors::RedisError::DeleteFailed)?;
self.delete_key(&key).await?;
}
Ok(())
}
}
struct RedisConfig { struct RedisConfig {
default_ttl: u32, default_ttl: u32,
default_stream_read_count: u64, default_stream_read_count: u64,

View File

@ -23,3 +23,4 @@ pub(crate) const BASE64_ENGINE_URL_SAFE: base64::engine::GeneralPurpose =
base64::engine::general_purpose::URL_SAFE; base64::engine::general_purpose::URL_SAFE;
pub(crate) const API_KEY_LENGTH: usize = 64; pub(crate) const API_KEY_LENGTH: usize = 64;
pub(crate) const PUB_SUB_CHANNEL: &str = "hyperswitch_invalidate";

View File

@ -44,3 +44,10 @@ macro_rules! newtype {
$crate::newtype_impl!($is_pub, $name, $ty_path); $crate::newtype_impl!($is_pub, $name, $ty_path);
}; };
} }
#[macro_export]
macro_rules! async_spawn {
($t:block) => {
tokio::spawn(async move { $t });
};
}

View File

@ -6,13 +6,15 @@ pub mod logger;
use std::sync::{atomic, Arc}; use std::sync::{atomic, Arc};
use redis_interface::errors::RedisError; use redis_interface::{errors::RedisError, PubSubInterface};
pub use self::api::*; pub use self::api::*;
#[cfg(feature = "basilisk")] #[cfg(feature = "basilisk")]
pub use self::encryption::*; pub use self::encryption::*;
use crate::{ use crate::{
async_spawn,
connection::{diesel_make_pg_pool, PgPool}, connection::{diesel_make_pg_pool, PgPool},
consts,
core::errors, core::errors,
}; };
@ -38,7 +40,17 @@ impl Store {
let redis_conn = Arc::new(crate::connection::redis_connection(config).await); let redis_conn = Arc::new(crate::connection::redis_connection(config).await);
let redis_clone = redis_conn.clone(); let redis_clone = redis_conn.clone();
tokio::spawn(async move { let subscriber_conn = redis_conn.clone();
redis_conn.subscribe(consts::PUB_SUB_CHANNEL).await.ok();
async_spawn!({
if let Err(e) = subscriber_conn.on_message().await {
logger::error!(pubsub_err=?e);
}
});
async_spawn!({
redis_clone.on_error().await; redis_clone.on_error().await;
}); });