Todo redis interface (#136)

This commit is contained in:
Nishant Joshi
2022-12-13 18:38:11 +05:30
committed by GitHub
parent 6d30989f59
commit 9fe3f7d904
6 changed files with 81 additions and 5 deletions

1
Cargo.lock generated
View File

@ -2495,6 +2495,7 @@ dependencies = [
"router_env",
"serde",
"thiserror",
"tokio",
]
[[package]]

View File

@ -16,3 +16,6 @@ thiserror = "1.0.37"
# First party crates
common_utils = { version = "0.1.0", path = "../common_utils" }
router_env = { version = "0.1.0", path = "../router_env", features = ["log_extra_implicit_fields", "log_custom_entries_to_extra"] }
[dev-dependencies]
tokio = { version = "1.23.0", features = ["macros", "rt-multi-thread"] }

View File

@ -1,6 +1,10 @@
//!
//! An interface to abstract the `fred` commands
//!
//! The folder provides generic functions for providing serialization
//! and deserialization while calling redis.
//! It also includes instruments to provide tracing.
//!
//!
use std::fmt::Debug;
@ -375,8 +379,6 @@ impl super::RedisConnectionPool {
// Consumer Group API
//TODO: Handle RedisEntryId Enum cases which are not valid
//implement xgroup_create_mkstream if needed
#[instrument(level = "DEBUG", skip(self))]
pub async fn consumer_group_create(
&self,
@ -384,6 +386,14 @@ impl super::RedisConnectionPool {
group: &str,
id: &RedisEntryId,
) -> CustomResult<(), errors::RedisError> {
if matches!(
id,
RedisEntryId::AutoGeneratedID | RedisEntryId::UndeliveredEntryID
) {
// FIXME: Replace with utils::when
Err(errors::RedisError::InvalidRedisEntryId)?;
}
self.pool
.xgroup_create(stream, group, id, true)
.await
@ -464,3 +474,30 @@ impl super::RedisConnectionPool {
.change_context(errors::RedisError::ConsumerGroupClaimFailed)
}
}
#[cfg(test)]
mod tests {
use crate::{errors::RedisError, RedisConnectionPool, RedisEntryId, RedisSettings};
#[tokio::test]
async fn test_consumer_group_create() {
let redis_conn = RedisConnectionPool::new(&RedisSettings::default()).await;
let result1 = redis_conn
.consumer_group_create("TEST1", "GTEST", &RedisEntryId::AutoGeneratedID)
.await;
let result2 = redis_conn
.consumer_group_create("TEST3", "GTEST", &RedisEntryId::UndeliveredEntryID)
.await;
assert!(matches!(
result1.unwrap_err().current_context(),
RedisError::InvalidRedisEntryId
));
assert!(matches!(
result2.unwrap_err().current_context(),
RedisError::InvalidRedisEntryId
));
}
}

View File

@ -44,4 +44,6 @@ pub enum RedisError {
GetHashFieldFailed,
#[error("The requested value was not found in Redis")]
NotFound,
#[error("Invalid RedisEntryId provided")]
InvalidRedisEntryId,
}

View File

@ -1,5 +1,21 @@
//! Intermediate module for encapsulate all the redis related functionality
//!
//! Provides structs to represent redis connection and all functions that redis provides and
//! are used in the `router` crate. Abstractions for creating a new connection while also facilitating
//! redis connection pool and configuration based types.
//!
//! # Examples
//! ```
//! pub mod types;
//! use self::types;
//!
//! #[tokio::main]
//! fn main() -> Result<(), Box<dyn std::error::Error>> {
//! let redis_conn = RedisConnectionPool::new(types::RedisSettings::default()).await;
//! // ... redis_conn ready to use
//! }
//! ```
#![forbid(unsafe_code)]
// TODO: Add crate & modules documentation for this crate
pub mod commands;
pub mod errors;

View File

@ -6,8 +6,8 @@
pub struct RedisSettings {
pub host: String,
pub port: u16,
pub cluster_urls: Vec<String>,
pub cluster_enabled: bool,
pub cluster_urls: Vec<String>,
pub use_legacy_version: bool,
pub pool_size: usize,
pub reconnect_max_attempts: u32,
@ -18,6 +18,23 @@ pub struct RedisSettings {
pub stream_read_count: u64,
}
impl Default for RedisSettings {
fn default() -> Self {
Self {
host: "127.0.0.1".to_string(),
port: 6379,
cluster_enabled: false,
cluster_urls: vec![],
use_legacy_version: false,
pool_size: 5,
reconnect_max_attempts: 5,
reconnect_delay: 5,
default_ttl: 300,
stream_read_count: 1,
}
}
}
#[derive(Debug)]
pub enum RedisEntryId {
UserSpecifiedID {