mirror of
				https://github.com/juspay/hyperswitch.git
				synced 2025-10-31 18:17:13 +08:00 
			
		
		
		
	
		
			
				
	
	
		
			198 lines
		
	
	
		
			6.1 KiB
		
	
	
	
		
			Rust
		
	
	
	
	
	
			
		
		
	
	
			198 lines
		
	
	
		
			6.1 KiB
		
	
	
	
		
			Rust
		
	
	
	
	
	
| //! Intermediate module for encapsulate all the redis related functionality
 | |
| //!
 | |
| //! Provides structs to represent redis connection and all functions that redis provides and
 | |
| //! are used in the `router` crate. Abstractions for creating a new connection while also facilitating
 | |
| //! redis connection pool and configuration based types.
 | |
| //!
 | |
| //!  # Examples
 | |
| //! ```
 | |
| //! use redis_interface::{types::RedisSettings, RedisConnectionPool};
 | |
| //!
 | |
| //! #[tokio::main]
 | |
| //! async fn main() {
 | |
| //!     let redis_conn = RedisConnectionPool::new(&RedisSettings::default()).await;
 | |
| //!     // ... redis_conn ready to use
 | |
| //! }
 | |
| //! ```
 | |
| #![forbid(unsafe_code)]
 | |
| 
 | |
| pub mod commands;
 | |
| pub mod errors;
 | |
| pub mod types;
 | |
| 
 | |
| use std::sync::{atomic, Arc};
 | |
| 
 | |
| use common_utils::errors::CustomResult;
 | |
| use error_stack::{IntoReport, ResultExt};
 | |
| use fred::interfaces::ClientLike;
 | |
| pub use fred::interfaces::PubsubInterface;
 | |
| use router_env::logger;
 | |
| 
 | |
| pub use self::{commands::*, types::*};
 | |
| 
 | |
| pub struct RedisConnectionPool {
 | |
|     pub pool: fred::pool::RedisPool,
 | |
|     config: RedisConfig,
 | |
|     pub subscriber: SubscriberClient,
 | |
|     pub publisher: RedisClient,
 | |
|     pub is_redis_available: Arc<atomic::AtomicBool>,
 | |
| }
 | |
| 
 | |
| pub struct RedisClient {
 | |
|     inner: fred::prelude::RedisClient,
 | |
| }
 | |
| 
 | |
| impl std::ops::Deref for RedisClient {
 | |
|     type Target = fred::prelude::RedisClient;
 | |
|     fn deref(&self) -> &Self::Target {
 | |
|         &self.inner
 | |
|     }
 | |
| }
 | |
| 
 | |
| impl RedisClient {
 | |
|     pub async fn new(
 | |
|         config: fred::types::RedisConfig,
 | |
|         reconnect_policy: fred::types::ReconnectPolicy,
 | |
|     ) -> CustomResult<Self, errors::RedisError> {
 | |
|         let client = fred::prelude::RedisClient::new(config, None, Some(reconnect_policy));
 | |
|         client.connect();
 | |
|         client
 | |
|             .wait_for_connect()
 | |
|             .await
 | |
|             .into_report()
 | |
|             .change_context(errors::RedisError::RedisConnectionError)?;
 | |
|         Ok(Self { inner: client })
 | |
|     }
 | |
| }
 | |
| 
 | |
| pub struct SubscriberClient {
 | |
|     inner: fred::clients::SubscriberClient,
 | |
| }
 | |
| 
 | |
| impl SubscriberClient {
 | |
|     pub async fn new(
 | |
|         config: fred::types::RedisConfig,
 | |
|         reconnect_policy: fred::types::ReconnectPolicy,
 | |
|     ) -> CustomResult<Self, errors::RedisError> {
 | |
|         let client = fred::clients::SubscriberClient::new(config, None, Some(reconnect_policy));
 | |
|         client.connect();
 | |
|         client
 | |
|             .wait_for_connect()
 | |
|             .await
 | |
|             .into_report()
 | |
|             .change_context(errors::RedisError::RedisConnectionError)?;
 | |
|         Ok(Self { inner: client })
 | |
|     }
 | |
| }
 | |
| 
 | |
| impl std::ops::Deref for SubscriberClient {
 | |
|     type Target = fred::clients::SubscriberClient;
 | |
|     fn deref(&self) -> &Self::Target {
 | |
|         &self.inner
 | |
|     }
 | |
| }
 | |
| 
 | |
| impl RedisConnectionPool {
 | |
|     /// Create a new Redis connection
 | |
|     pub async fn new(conf: &RedisSettings) -> CustomResult<Self, errors::RedisError> {
 | |
|         let redis_connection_url = match conf.cluster_enabled {
 | |
|             // Fred relies on this format for specifying cluster where the host port is ignored & only query parameters are used for node addresses
 | |
|             // redis-cluster://username:password@host:port?node=bar.com:30002&node=baz.com:30003
 | |
|             true => format!(
 | |
|                 "redis-cluster://{}:{}?{}",
 | |
|                 conf.host,
 | |
|                 conf.port,
 | |
|                 conf.cluster_urls
 | |
|                     .iter()
 | |
|                     .flat_map(|url| vec!["&", url])
 | |
|                     .skip(1)
 | |
|                     .collect::<String>()
 | |
|             ),
 | |
|             false => format!(
 | |
|                 "redis://{}:{}", //URI Schema
 | |
|                 conf.host, conf.port,
 | |
|             ),
 | |
|         };
 | |
|         let mut config = fred::types::RedisConfig::from_url(&redis_connection_url)
 | |
|             .into_report()
 | |
|             .change_context(errors::RedisError::RedisConnectionError)?;
 | |
| 
 | |
|         if !conf.use_legacy_version {
 | |
|             config.version = fred::types::RespVersion::RESP3;
 | |
|         }
 | |
|         config.tracing = fred::types::TracingConfig::new(true);
 | |
|         config.blocking = fred::types::Blocking::Error;
 | |
|         let reconnect_policy = fred::types::ReconnectPolicy::new_constant(
 | |
|             conf.reconnect_max_attempts,
 | |
|             conf.reconnect_delay,
 | |
|         );
 | |
| 
 | |
|         let subscriber = SubscriberClient::new(config.clone(), reconnect_policy.clone()).await?;
 | |
| 
 | |
|         let publisher = RedisClient::new(config.clone(), reconnect_policy.clone()).await?;
 | |
| 
 | |
|         let pool = fred::pool::RedisPool::new(config, None, Some(reconnect_policy), conf.pool_size)
 | |
|             .into_report()
 | |
|             .change_context(errors::RedisError::RedisConnectionError)?;
 | |
| 
 | |
|         pool.connect();
 | |
|         pool.wait_for_connect()
 | |
|             .await
 | |
|             .into_report()
 | |
|             .change_context(errors::RedisError::RedisConnectionError)?;
 | |
| 
 | |
|         let config = RedisConfig::from(conf);
 | |
| 
 | |
|         Ok(Self {
 | |
|             pool,
 | |
|             config,
 | |
|             is_redis_available: Arc::new(atomic::AtomicBool::new(true)),
 | |
|             subscriber,
 | |
|             publisher,
 | |
|         })
 | |
|     }
 | |
| 
 | |
|     pub async fn on_error(&self, tx: tokio::sync::oneshot::Sender<()>) {
 | |
|         while let Ok(redis_error) = self.pool.on_error().recv().await {
 | |
|             logger::error!(?redis_error, "Redis protocol or connection error");
 | |
|             logger::error!("current state: {:#?}", self.pool.state());
 | |
|             if self.pool.state() == fred::types::ClientState::Disconnected {
 | |
|                 if tx.send(()).is_err() {
 | |
|                     logger::error!("The redis shutdown signal sender failed to signal");
 | |
|                 }
 | |
|                 self.is_redis_available
 | |
|                     .store(false, atomic::Ordering::SeqCst);
 | |
|                 break;
 | |
|             }
 | |
|         }
 | |
|     }
 | |
| }
 | |
| 
 | |
| struct RedisConfig {
 | |
|     default_ttl: u32,
 | |
|     default_stream_read_count: u64,
 | |
|     default_hash_ttl: u32,
 | |
| }
 | |
| 
 | |
| impl From<&RedisSettings> for RedisConfig {
 | |
|     fn from(config: &RedisSettings) -> Self {
 | |
|         Self {
 | |
|             default_ttl: config.default_ttl,
 | |
|             default_stream_read_count: config.stream_read_count,
 | |
|             default_hash_ttl: config.default_hash_ttl,
 | |
|         }
 | |
|     }
 | |
| }
 | |
| 
 | |
| #[cfg(test)]
 | |
| mod test {
 | |
|     use super::*;
 | |
| 
 | |
|     #[test]
 | |
|     fn test_redis_error() {
 | |
|         let x = errors::RedisError::ConsumerGroupClaimFailed.to_string();
 | |
| 
 | |
|         assert_eq!(x, "Failed to set Redis stream message owner".to_string())
 | |
|     }
 | |
| }
 | 
