refactor: add hgetall command to redis interface (#5727)

Co-authored-by: hyperswitch-bot[bot] <148525504+hyperswitch-bot[bot]@users.noreply.github.com>
This commit is contained in:
Chethan Rao
2024-09-10 16:33:41 +05:30
committed by GitHub
parent f4ad6579cc
commit 74ec3f3df3

View File

@ -19,7 +19,8 @@ use fred::{
prelude::RedisErrorKind,
types::{
Expiration, FromRedis, MultipleIDs, MultipleKeys, MultipleOrderedPairs, MultipleStrings,
MultipleValues, RedisKey, RedisMap, RedisValue, Scanner, SetOptions, XCap, XReadResponse,
MultipleValues, RedisKey, RedisMap, RedisValue, ScanType, Scanner, SetOptions, XCap,
XReadResponse,
},
};
use futures::StreamExt;
@ -209,6 +210,25 @@ impl super::RedisConnectionPool {
.change_context(errors::RedisError::DeleteFailed)
}
#[instrument(level = "DEBUG", skip(self))]
pub async fn delete_multiple_keys(
&self,
keys: Vec<String>,
) -> CustomResult<Vec<DelReply>, errors::RedisError> {
let mut del_result = Vec::with_capacity(keys.len());
for key in keys {
del_result.push(
self.pool
.del(self.add_prefix(&key))
.await
.change_context(errors::RedisError::DeleteFailed)?,
);
}
Ok(del_result)
}
#[instrument(level = "DEBUG", skip(self))]
pub async fn set_key_with_expiry<V>(
&self,
@ -372,16 +392,25 @@ impl super::RedisConnectionPool {
}
#[instrument(level = "DEBUG", skip(self))]
pub async fn increment_field_in_hash(
pub async fn increment_fields_in_hash<T>(
&self,
key: &str,
field: &str,
increment: i64,
) -> CustomResult<usize, errors::RedisError> {
self.pool
.hincrby(self.add_prefix(key), field, increment)
.await
.change_context(errors::RedisError::IncrementHashFieldFailed)
fields_to_increment: &[(T, i64)],
) -> CustomResult<Vec<usize>, errors::RedisError>
where
T: Debug + ToString,
{
let mut values_after_increment = Vec::with_capacity(fields_to_increment.len());
for (field, increment) in fields_to_increment.iter() {
values_after_increment.push(
self.pool
.hincrby(self.add_prefix(key), field.to_string(), *increment)
.await
.change_context(errors::RedisError::IncrementHashFieldFailed)?,
)
}
Ok(values_after_increment)
}
#[instrument(level = "DEBUG", skip(self))]
@ -405,7 +434,38 @@ impl super::RedisConnectionPool {
Some(futures::stream::iter(v))
}
Err(err) => {
tracing::error!(?err);
tracing::error!(redis_err=?err, "Redis error while executing hscan command");
None
}
}
})
.flatten()
.collect::<Vec<_>>()
.await)
}
#[instrument(level = "DEBUG", skip(self))]
pub async fn scan(
&self,
pattern: &str,
count: Option<u32>,
scan_type: Option<ScanType>,
) -> CustomResult<Vec<String>, errors::RedisError> {
Ok(self
.pool
.next()
.scan(&self.add_prefix(pattern), count, scan_type)
.filter_map(|value| async move {
match value {
Ok(mut v) => {
let v = v.take_results()?;
let v: Vec<String> =
v.into_iter().filter_map(|val| val.into_string()).collect();
Some(futures::stream::iter(v))
}
Err(err) => {
tracing::error!(redis_err=?err, "Redis error while executing scan command");
None
}
}
@ -450,6 +510,17 @@ impl super::RedisConnectionPool {
.change_context(errors::RedisError::GetHashFieldFailed)
}
#[instrument(level = "DEBUG", skip(self))]
pub async fn get_hash_fields<V>(&self, key: &str) -> CustomResult<V, errors::RedisError>
where
V: FromRedis + Unpin + Send + 'static,
{
self.pool
.hgetall(self.add_prefix(key))
.await
.change_context(errors::RedisError::GetHashFieldFailed)
}
#[instrument(level = "DEBUG", skip(self))]
pub async fn get_hash_field_and_deserialize<V>(
&self,