//! //! An interface to abstract the `fred` commands //! use std::fmt::Debug; use common_utils::{ errors::CustomResult, ext_traits::{ByteSliceExt, Encode}, }; use error_stack::{IntoReport, ResultExt}; use fred::{ interfaces::{KeysInterface, StreamsInterface}, types::{ Expiration, FromRedis, MultipleIDs, MultipleKeys, MultipleOrderedPairs, MultipleStrings, RedisMap, RedisValue, SetOptions, XReadResponse, }, }; use router_env::{tracing, tracing::instrument}; use crate::{ errors, types::{RedisEntryId, SetNXReply}, }; impl super::RedisConnectionPool { #[instrument(level = "DEBUG", skip(self))] pub async fn set_key(&self, key: &str, value: V) -> CustomResult<(), errors::RedisError> where V: TryInto + Debug, V::Error: Into, { self.pool .set( key, value, Some(Expiration::EX(self.config.default_ttl.into())), None, false, ) .await .into_report() .change_context(errors::RedisError::SetFailed) } pub async fn msetnx(&self, value: V) -> CustomResult where V: TryInto + Debug, V::Error: Into, { self.pool .msetnx::(value) .await .into_report() .change_context(errors::RedisError::SetFailed) } #[instrument(level = "DEBUG", skip(self))] pub async fn serialize_and_set_key( &self, key: &str, value: V, ) -> CustomResult<(), errors::RedisError> where V: serde::Serialize + Debug, { let serialized = Encode::::encode_to_vec(&value) .change_context(errors::RedisError::JsonSerializationFailed)?; self.set_key(key, &serialized as &[u8]).await } #[instrument(level = "DEBUG", skip(self))] pub async fn get_key(&self, key: &str) -> CustomResult where V: FromRedis + Unpin + Send + 'static, { self.pool .get(key) .await .into_report() .change_context(errors::RedisError::GetFailed) } #[instrument(level = "DEBUG", skip(self))] pub async fn exists(&self, key: &str) -> CustomResult where V: Into + Unpin + Send + 'static, { self.pool .exists(key) .await .into_report() .change_context(errors::RedisError::GetFailed) } #[instrument(level = "DEBUG", skip(self))] pub async fn get_and_deserialize_key( &self, key: &str, type_name: &str, ) -> CustomResult where T: serde::de::DeserializeOwned, { let value_bytes = self.get_key::>(key).await?; value_bytes .parse_struct(type_name) .change_context(errors::RedisError::JsonDeserializationFailed) } #[instrument(level = "DEBUG", skip(self))] pub async fn delete_key(&self, key: &str) -> CustomResult<(), errors::RedisError> { self.pool .del(key) .await .into_report() .change_context(errors::RedisError::DeleteFailed) } #[instrument(level = "DEBUG", skip(self))] pub async fn set_key_with_expiry( &self, key: &str, value: V, seconds: i64, ) -> CustomResult<(), errors::RedisError> where V: TryInto + Debug, V::Error: Into, { self.pool .set(key, value, Some(Expiration::EX(seconds)), None, false) .await .into_report() .change_context(errors::RedisError::SetExFailed) } #[instrument(level = "DEBUG", skip(self))] pub async fn set_key_if_not_exist( &self, key: &str, value: V, ) -> CustomResult where V: TryInto + Debug, V::Error: Into, { self.pool .set( key, value, Some(Expiration::EX(self.config.default_ttl.into())), Some(SetOptions::NX), false, ) .await .into_report() .change_context(errors::RedisError::SetFailed) } #[instrument(level = "DEBUG", skip(self))] pub async fn set_expiry( &self, key: &str, seconds: i64, ) -> CustomResult<(), errors::RedisError> { self.pool .expire(key, seconds) .await .into_report() .change_context(errors::RedisError::SetExpiryFailed) } #[instrument(level = "DEBUG", skip(self))] pub async fn set_expire_at( &self, key: &str, timestamp: i64, ) -> CustomResult<(), errors::RedisError> { self.pool .expire_at(key, timestamp) .await .into_report() .change_context(errors::RedisError::SetExpiryFailed) } #[instrument(level = "DEBUG", skip(self))] pub async fn stream_append_entry( &self, stream: &str, entry_id: &RedisEntryId, fields: F, ) -> CustomResult<(), errors::RedisError> where F: TryInto + Debug, F::Error: Into, { self.pool .xadd(stream, false, None, entry_id, fields) .await .into_report() .change_context(errors::RedisError::StreamAppendFailed) } #[instrument(level = "DEBUG", skip(self))] pub async fn stream_delete_entries( &self, stream: &str, ids: Ids, ) -> CustomResult where Ids: Into + Debug, { self.pool .xdel(stream, ids) .await .into_report() .change_context(errors::RedisError::StreamDeleteFailed) } #[instrument(level = "DEBUG", skip(self))] pub async fn stream_acknowledge_entries( &self, stream: &str, group: &str, ids: Ids, ) -> CustomResult where Ids: Into + Debug, { self.pool .xack(stream, group, ids) .await .into_report() .change_context(errors::RedisError::StreamAcknowledgeFailed) } #[instrument(level = "DEBUG", skip(self))] pub async fn stream_read_entries( &self, streams: K, ids: Ids, ) -> CustomResult, errors::RedisError> where K: Into + Debug, Ids: Into + Debug, { self.pool .xread( Some(self.config.default_stream_read_count), None, streams, ids, ) .await .into_report() .change_context(errors::RedisError::StreamReadFailed) } #[instrument(level = "DEBUG", skip(self))] pub async fn stream_read_with_options( &self, streams: K, ids: Ids, count: Option, block: Option, // timeout in milliseconds group: Option<(&str, &str)>, // (group_name, consumer_name) ) -> CustomResult>, errors::RedisError> where K: Into + Debug, Ids: Into + Debug, { match group { Some((group_name, consumer_name)) => { self.pool .xreadgroup_map(group_name, consumer_name, count, block, false, streams, ids) .await } None => self.pool.xread_map(count, block, streams, ids).await, } .into_report() .change_context(errors::RedisError::StreamReadFailed) } // Consumer Group API //TODO: Handle RedisEntryId Enum cases which are not valid //implement xgroup_create_mkstream if needed #[instrument(level = "DEBUG", skip(self))] pub async fn consumer_group_create( &self, stream: &str, group: &str, id: &RedisEntryId, ) -> CustomResult<(), errors::RedisError> { self.pool .xgroup_create(stream, group, id, true) .await .into_report() .change_context(errors::RedisError::ConsumerGroupCreateFailed) } #[instrument(level = "DEBUG", skip(self))] pub async fn consumer_group_destroy( &self, stream: &str, group: &str, ) -> CustomResult { self.pool .xgroup_destroy(stream, group) .await .into_report() .change_context(errors::RedisError::ConsumerGroupDestroyFailed) } // the number of pending messages that the consumer had before it was deleted #[instrument(level = "DEBUG", skip(self))] pub async fn consumer_group_delete_consumer( &self, stream: &str, group: &str, consumer: &str, ) -> CustomResult { self.pool .xgroup_delconsumer(stream, group, consumer) .await .into_report() .change_context(errors::RedisError::ConsumerGroupRemoveConsumerFailed) } #[instrument(level = "DEBUG", skip(self))] pub async fn consumer_group_set_last_id( &self, stream: &str, group: &str, id: &RedisEntryId, ) -> CustomResult { self.pool .xgroup_setid(stream, group, id) .await .into_report() .change_context(errors::RedisError::ConsumerGroupSetIdFailed) } #[instrument(level = "DEBUG", skip(self))] pub async fn consumer_group_set_message_owner( &self, stream: &str, group: &str, consumer: &str, min_idle_time: u64, ids: Ids, ) -> CustomResult where Ids: Into + Debug, R: FromRedis + Unpin + Send + 'static, { self.pool .xclaim( stream, group, consumer, min_idle_time, ids, None, None, None, false, false, ) .await .into_report() .change_context(errors::RedisError::ConsumerGroupClaimFailed) } }