mirror of
https://github.com/juspay/hyperswitch.git
synced 2025-10-28 12:15:40 +08:00
feat: use subscriber client for subscription in pubsub (#1297)
This commit is contained in:
@ -10,7 +10,7 @@ license = "Apache-2.0"
|
|||||||
[dependencies]
|
[dependencies]
|
||||||
async-trait = "0.1.68"
|
async-trait = "0.1.68"
|
||||||
error-stack = "0.3.1"
|
error-stack = "0.3.1"
|
||||||
fred = { version = "6.0.0", features = ["metrics", "partial-tracing"] }
|
fred = { version = "6.3.0", features = ["metrics", "partial-tracing","subscriber-client"] }
|
||||||
futures = "0.3"
|
futures = "0.3"
|
||||||
serde = { version = "1.0.160", features = ["derive"] }
|
serde = { version = "1.0.160", features = ["derive"] }
|
||||||
thiserror = "1.0.40"
|
thiserror = "1.0.40"
|
||||||
|
|||||||
@ -35,7 +35,7 @@ pub struct RedisConnectionPool {
|
|||||||
pub pool: fred::pool::RedisPool,
|
pub pool: fred::pool::RedisPool,
|
||||||
config: RedisConfig,
|
config: RedisConfig,
|
||||||
join_handles: Vec<fred::types::ConnectHandle>,
|
join_handles: Vec<fred::types::ConnectHandle>,
|
||||||
pub subscriber: RedisClient,
|
pub subscriber: SubscriberClient,
|
||||||
pub publisher: RedisClient,
|
pub publisher: RedisClient,
|
||||||
pub is_redis_available: Arc<atomic::AtomicBool>,
|
pub is_redis_available: Arc<atomic::AtomicBool>,
|
||||||
}
|
}
|
||||||
@ -67,6 +67,33 @@ impl RedisClient {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub struct SubscriberClient {
|
||||||
|
inner: fred::clients::SubscriberClient,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SubscriberClient {
|
||||||
|
pub async fn new(
|
||||||
|
config: fred::types::RedisConfig,
|
||||||
|
reconnect_policy: fred::types::ReconnectPolicy,
|
||||||
|
) -> CustomResult<Self, errors::RedisError> {
|
||||||
|
let client = fred::clients::SubscriberClient::new(config, None, Some(reconnect_policy));
|
||||||
|
client.connect();
|
||||||
|
client
|
||||||
|
.wait_for_connect()
|
||||||
|
.await
|
||||||
|
.into_report()
|
||||||
|
.change_context(errors::RedisError::RedisConnectionError)?;
|
||||||
|
Ok(Self { inner: client })
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl std::ops::Deref for SubscriberClient {
|
||||||
|
type Target = fred::clients::SubscriberClient;
|
||||||
|
fn deref(&self) -> &Self::Target {
|
||||||
|
&self.inner
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl RedisConnectionPool {
|
impl RedisConnectionPool {
|
||||||
/// Create a new Redis connection
|
/// Create a new Redis connection
|
||||||
pub async fn new(conf: &RedisSettings) -> CustomResult<Self, errors::RedisError> {
|
pub async fn new(conf: &RedisSettings) -> CustomResult<Self, errors::RedisError> {
|
||||||
@ -102,7 +129,7 @@ impl RedisConnectionPool {
|
|||||||
conf.reconnect_delay,
|
conf.reconnect_delay,
|
||||||
);
|
);
|
||||||
|
|
||||||
let subscriber = RedisClient::new(config.clone(), reconnect_policy.clone()).await?;
|
let subscriber = SubscriberClient::new(config.clone(), reconnect_policy.clone()).await?;
|
||||||
|
|
||||||
let publisher = RedisClient::new(config.clone(), reconnect_policy.clone()).await?;
|
let publisher = RedisClient::new(config.clone(), reconnect_policy.clone()).await?;
|
||||||
|
|
||||||
@ -130,6 +157,19 @@ impl RedisConnectionPool {
|
|||||||
|
|
||||||
pub async fn close_connections(&mut self) {
|
pub async fn close_connections(&mut self) {
|
||||||
self.pool.quit_pool().await;
|
self.pool.quit_pool().await;
|
||||||
|
|
||||||
|
self.publisher
|
||||||
|
.quit()
|
||||||
|
.await
|
||||||
|
.map_err(|err| logger::error!(redis_quit_err=?err))
|
||||||
|
.ok();
|
||||||
|
|
||||||
|
self.subscriber
|
||||||
|
.quit()
|
||||||
|
.await
|
||||||
|
.map_err(|err| logger::error!(redis_quit_err=?err))
|
||||||
|
.ok();
|
||||||
|
|
||||||
for handle in self.join_handles.drain(..) {
|
for handle in self.join_handles.drain(..) {
|
||||||
match handle.await {
|
match handle.await {
|
||||||
Ok(Ok(_)) => (),
|
Ok(Ok(_)) => (),
|
||||||
|
|||||||
@ -42,6 +42,9 @@ impl PubSubInterface for redis_interface::RedisConnectionPool {
|
|||||||
&self,
|
&self,
|
||||||
channel: &str,
|
channel: &str,
|
||||||
) -> errors::CustomResult<usize, redis_errors::RedisError> {
|
) -> errors::CustomResult<usize, redis_errors::RedisError> {
|
||||||
|
// Spawns a task that will automatically re-subscribe to any channels or channel patterns used by the client.
|
||||||
|
self.subscriber.manage_subscriptions();
|
||||||
|
|
||||||
self.subscriber
|
self.subscriber
|
||||||
.subscribe(channel)
|
.subscribe(channel)
|
||||||
.await
|
.await
|
||||||
|
|||||||
Reference in New Issue
Block a user