diff --git a/crates/redis_interface/src/commands.rs b/crates/redis_interface/src/commands.rs index ccfeb3e5b1..9b76d8d9c8 100644 --- a/crates/redis_interface/src/commands.rs +++ b/crates/redis_interface/src/commands.rs @@ -15,7 +15,7 @@ use common_utils::{ }; use error_stack::{report, ResultExt}; use fred::{ - interfaces::{HashesInterface, KeysInterface, SetsInterface, StreamsInterface}, + interfaces::{HashesInterface, KeysInterface, ListInterface, SetsInterface, StreamsInterface}, prelude::RedisErrorKind, types::{ Expiration, FromRedis, MultipleIDs, MultipleKeys, MultipleOrderedPairs, MultipleStrings, @@ -371,6 +371,19 @@ impl super::RedisConnectionPool { Ok(hsetnx) } + #[instrument(level = "DEBUG", skip(self))] + pub async fn increment_field_in_hash( + &self, + key: &str, + field: &str, + increment: i64, + ) -> CustomResult { + self.pool + .hincrby(self.add_prefix(key), field, increment) + .await + .change_context(errors::RedisError::IncrementHashFieldFailed) + } + #[instrument(level = "DEBUG", skip(self))] pub async fn hscan( &self, @@ -630,6 +643,55 @@ impl super::RedisConnectionPool { }) } + #[instrument(level = "DEBUG", skip(self))] + pub async fn append_elements_to_list( + &self, + key: &str, + elements: V, + ) -> CustomResult<(), errors::RedisError> + where + V: TryInto + Debug + Send, + V::Error: Into + Send, + { + self.pool + .rpush(self.add_prefix(key), elements) + .await + .change_context(errors::RedisError::AppendElementsToListFailed) + } + + #[instrument(level = "DEBUG", skip(self))] + pub async fn get_list_elements( + &self, + key: &str, + start: i64, + stop: i64, + ) -> CustomResult, errors::RedisError> { + self.pool + .lrange(self.add_prefix(key), start, stop) + .await + .change_context(errors::RedisError::GetListElementsFailed) + } + + #[instrument(level = "DEBUG", skip(self))] + pub async fn get_list_length(&self, key: &str) -> CustomResult { + self.pool + .llen(self.add_prefix(key)) + .await + .change_context(errors::RedisError::GetListLengthFailed) + } + + #[instrument(level = "DEBUG", skip(self))] + pub async fn lpop_list_elements( + &self, + key: &str, + count: Option, + ) -> CustomResult, errors::RedisError> { + self.pool + .lpop(self.add_prefix(key), count) + .await + .change_context(errors::RedisError::PopListElementsFailed) + } + // Consumer Group API #[instrument(level = "DEBUG", skip(self))] diff --git a/crates/redis_interface/src/errors.rs b/crates/redis_interface/src/errors.rs index cc7f8bdd21..0e2a4b8d63 100644 --- a/crates/redis_interface/src/errors.rs +++ b/crates/redis_interface/src/errors.rs @@ -68,4 +68,14 @@ pub enum RedisError { OnMessageError, #[error("Got an unknown result from redis")] UnknownResult, + #[error("Failed to append elements to list in Redis")] + AppendElementsToListFailed, + #[error("Failed to get list elements in Redis")] + GetListElementsFailed, + #[error("Failed to get length of list")] + GetListLengthFailed, + #[error("Failed to pop list elements in Redis")] + PopListElementsFailed, + #[error("Failed to increment hash field in Redis")] + IncrementHashFieldFailed, }