//! Intermediate module for encapsulate all the redis related functionality //! //! Provides structs to represent redis connection and all functions that redis provides and //! are used in the `router` crate. Abstractions for creating a new connection while also facilitating //! redis connection pool and configuration based types. //! //! # Examples //! ``` //! use redis_interface::{types::RedisSettings, RedisConnectionPool}; //! //! #[tokio::main] //! async fn main() { //! let redis_conn = RedisConnectionPool::new(&RedisSettings::default()).await; //! // ... redis_conn ready to use //! } //! ``` #![forbid(unsafe_code)] pub mod commands; pub mod errors; pub mod types; use std::sync::{atomic, Arc}; use common_utils::errors::CustomResult; use error_stack::{IntoReport, ResultExt}; use fred::interfaces::ClientLike; pub use fred::interfaces::PubsubInterface; use router_env::logger; pub use self::{commands::*, types::*}; pub struct RedisConnectionPool { pub pool: fred::pool::RedisPool, config: RedisConfig, pub subscriber: SubscriberClient, pub publisher: RedisClient, pub is_redis_available: Arc, } pub struct RedisClient { inner: fred::prelude::RedisClient, } impl std::ops::Deref for RedisClient { type Target = fred::prelude::RedisClient; fn deref(&self) -> &Self::Target { &self.inner } } impl RedisClient { pub async fn new( config: fred::types::RedisConfig, reconnect_policy: fred::types::ReconnectPolicy, ) -> CustomResult { let client = fred::prelude::RedisClient::new(config, None, Some(reconnect_policy)); client.connect(); client .wait_for_connect() .await .into_report() .change_context(errors::RedisError::RedisConnectionError)?; Ok(Self { inner: client }) } } pub struct SubscriberClient { inner: fred::clients::SubscriberClient, } impl SubscriberClient { pub async fn new( config: fred::types::RedisConfig, reconnect_policy: fred::types::ReconnectPolicy, ) -> CustomResult { let client = fred::clients::SubscriberClient::new(config, None, Some(reconnect_policy)); client.connect(); client .wait_for_connect() .await .into_report() .change_context(errors::RedisError::RedisConnectionError)?; Ok(Self { inner: client }) } } impl std::ops::Deref for SubscriberClient { type Target = fred::clients::SubscriberClient; fn deref(&self) -> &Self::Target { &self.inner } } impl RedisConnectionPool { /// Create a new Redis connection pub async fn new(conf: &RedisSettings) -> CustomResult { let redis_connection_url = match conf.cluster_enabled { // Fred relies on this format for specifying cluster where the host port is ignored & only query parameters are used for node addresses // redis-cluster://username:password@host:port?node=bar.com:30002&node=baz.com:30003 true => format!( "redis-cluster://{}:{}?{}", conf.host, conf.port, conf.cluster_urls .iter() .flat_map(|url| vec!["&", url]) .skip(1) .collect::() ), false => format!( "redis://{}:{}", //URI Schema conf.host, conf.port, ), }; let mut config = fred::types::RedisConfig::from_url(&redis_connection_url) .into_report() .change_context(errors::RedisError::RedisConnectionError)?; if !conf.use_legacy_version { config.version = fred::types::RespVersion::RESP3; } config.tracing = fred::types::TracingConfig::new(true); config.blocking = fred::types::Blocking::Error; let reconnect_policy = fred::types::ReconnectPolicy::new_constant( conf.reconnect_max_attempts, conf.reconnect_delay, ); let subscriber = SubscriberClient::new(config.clone(), reconnect_policy.clone()).await?; let publisher = RedisClient::new(config.clone(), reconnect_policy.clone()).await?; let pool = fred::pool::RedisPool::new(config, None, Some(reconnect_policy), conf.pool_size) .into_report() .change_context(errors::RedisError::RedisConnectionError)?; pool.wait_for_connect() .await .into_report() .change_context(errors::RedisError::RedisConnectionError)?; let config = RedisConfig::from(conf); Ok(Self { pool, config, is_redis_available: Arc::new(atomic::AtomicBool::new(true)), subscriber, publisher, }) } pub async fn on_error(&self, tx: tokio::sync::oneshot::Sender<()>) { while let Ok(redis_error) = self.pool.on_error().recv().await { logger::error!(?redis_error, "Redis protocol or connection error"); logger::error!("current state: {:#?}", self.pool.state()); if self.pool.state() == fred::types::ClientState::Disconnected { if tx.send(()).is_err() { logger::error!("The redis shutdown signal sender failed to signal"); } self.is_redis_available .store(false, atomic::Ordering::SeqCst); break; } } } } struct RedisConfig { default_ttl: u32, default_stream_read_count: u64, default_hash_ttl: u32, } impl From<&RedisSettings> for RedisConfig { fn from(config: &RedisSettings) -> Self { Self { default_ttl: config.default_ttl, default_stream_read_count: config.stream_read_count, default_hash_ttl: config.default_hash_ttl, } } } #[cfg(test)] mod test { use super::*; #[test] fn test_redis_error() { let x = errors::RedisError::ConsumerGroupClaimFailed.to_string(); assert_eq!(x, "Failed to set Redis stream message owner".to_string()) } }