diff --git a/Cargo.lock b/Cargo.lock index 48bc181e28..7bcccf8e69 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2495,6 +2495,7 @@ dependencies = [ "router_env", "serde", "thiserror", + "tokio", ] [[package]] diff --git a/crates/redis_interface/Cargo.toml b/crates/redis_interface/Cargo.toml index 88941eaa93..2e10c2950e 100644 --- a/crates/redis_interface/Cargo.toml +++ b/crates/redis_interface/Cargo.toml @@ -16,3 +16,6 @@ thiserror = "1.0.37" # First party crates common_utils = { version = "0.1.0", path = "../common_utils" } router_env = { version = "0.1.0", path = "../router_env", features = ["log_extra_implicit_fields", "log_custom_entries_to_extra"] } + +[dev-dependencies] +tokio = { version = "1.23.0", features = ["macros", "rt-multi-thread"] } \ No newline at end of file diff --git a/crates/redis_interface/src/commands.rs b/crates/redis_interface/src/commands.rs index dfb89b0c1d..ef29c5d867 100644 --- a/crates/redis_interface/src/commands.rs +++ b/crates/redis_interface/src/commands.rs @@ -1,6 +1,10 @@ -//! //! An interface to abstract the `fred` commands //! +//! The folder provides generic functions for providing serialization +//! and deserialization while calling redis. +//! It also includes instruments to provide tracing. +//! +//! use std::fmt::Debug; @@ -375,8 +379,6 @@ impl super::RedisConnectionPool { // Consumer Group API - //TODO: Handle RedisEntryId Enum cases which are not valid - //implement xgroup_create_mkstream if needed #[instrument(level = "DEBUG", skip(self))] pub async fn consumer_group_create( &self, @@ -384,6 +386,14 @@ impl super::RedisConnectionPool { group: &str, id: &RedisEntryId, ) -> CustomResult<(), errors::RedisError> { + if matches!( + id, + RedisEntryId::AutoGeneratedID | RedisEntryId::UndeliveredEntryID + ) { + // FIXME: Replace with utils::when + Err(errors::RedisError::InvalidRedisEntryId)?; + } + self.pool .xgroup_create(stream, group, id, true) .await @@ -464,3 +474,30 @@ impl super::RedisConnectionPool { .change_context(errors::RedisError::ConsumerGroupClaimFailed) } } + +#[cfg(test)] +mod tests { + + use crate::{errors::RedisError, RedisConnectionPool, RedisEntryId, RedisSettings}; + + #[tokio::test] + async fn test_consumer_group_create() { + let redis_conn = RedisConnectionPool::new(&RedisSettings::default()).await; + + let result1 = redis_conn + .consumer_group_create("TEST1", "GTEST", &RedisEntryId::AutoGeneratedID) + .await; + let result2 = redis_conn + .consumer_group_create("TEST3", "GTEST", &RedisEntryId::UndeliveredEntryID) + .await; + + assert!(matches!( + result1.unwrap_err().current_context(), + RedisError::InvalidRedisEntryId + )); + assert!(matches!( + result2.unwrap_err().current_context(), + RedisError::InvalidRedisEntryId + )); + } +} diff --git a/crates/redis_interface/src/errors.rs b/crates/redis_interface/src/errors.rs index f6c2549827..54db5663a5 100644 --- a/crates/redis_interface/src/errors.rs +++ b/crates/redis_interface/src/errors.rs @@ -44,4 +44,6 @@ pub enum RedisError { GetHashFieldFailed, #[error("The requested value was not found in Redis")] NotFound, + #[error("Invalid RedisEntryId provided")] + InvalidRedisEntryId, } diff --git a/crates/redis_interface/src/lib.rs b/crates/redis_interface/src/lib.rs index f9ce437541..e1600a9fec 100644 --- a/crates/redis_interface/src/lib.rs +++ b/crates/redis_interface/src/lib.rs @@ -1,5 +1,21 @@ +//! Intermediate module for encapsulate all the redis related functionality +//! +//! Provides structs to represent redis connection and all functions that redis provides and +//! are used in the `router` crate. Abstractions for creating a new connection while also facilitating +//! redis connection pool and configuration based types. +//! +//! # Examples +//! ``` +//! pub mod types; +//! use self::types; +//! +//! #[tokio::main] +//! fn main() -> Result<(), Box> { +//! let redis_conn = RedisConnectionPool::new(types::RedisSettings::default()).await; +//! // ... redis_conn ready to use +//! } +//! ``` #![forbid(unsafe_code)] -// TODO: Add crate & modules documentation for this crate pub mod commands; pub mod errors; diff --git a/crates/redis_interface/src/types.rs b/crates/redis_interface/src/types.rs index 311f7a213c..f09d829a9b 100644 --- a/crates/redis_interface/src/types.rs +++ b/crates/redis_interface/src/types.rs @@ -6,8 +6,8 @@ pub struct RedisSettings { pub host: String, pub port: u16, - pub cluster_urls: Vec, pub cluster_enabled: bool, + pub cluster_urls: Vec, pub use_legacy_version: bool, pub pool_size: usize, pub reconnect_max_attempts: u32, @@ -18,6 +18,23 @@ pub struct RedisSettings { pub stream_read_count: u64, } +impl Default for RedisSettings { + fn default() -> Self { + Self { + host: "127.0.0.1".to_string(), + port: 6379, + cluster_enabled: false, + cluster_urls: vec![], + use_legacy_version: false, + pool_size: 5, + reconnect_max_attempts: 5, + reconnect_delay: 5, + default_ttl: 300, + stream_read_count: 1, + } + } +} + #[derive(Debug)] pub enum RedisEntryId { UserSpecifiedID {