feat(redis_interface): implement functions for HGET, HSET, HSETNX commands (#114)

This commit is contained in:
Sanchith Hegde
2022-12-12 15:32:46 +05:30
committed by GitHub
parent 01cafe753b
commit 81593e0a6b
8 changed files with 157 additions and 80 deletions

View File

@ -10,7 +10,7 @@ use common_utils::{
};
use error_stack::{IntoReport, ResultExt};
use fred::{
interfaces::{KeysInterface, StreamsInterface},
interfaces::{HashesInterface, KeysInterface, StreamsInterface},
types::{
Expiration, FromRedis, MultipleIDs, MultipleKeys, MultipleOrderedPairs, MultipleStrings,
RedisMap, RedisValue, SetOptions, XReadResponse,
@ -185,6 +185,111 @@ impl super::RedisConnectionPool {
.into_report()
.change_context(errors::RedisError::SetExpiryFailed)
}
#[instrument(level = "DEBUG", skip(self))]
pub async fn set_hash_fields<V>(
&self,
key: &str,
values: V,
) -> CustomResult<(), errors::RedisError>
where
V: TryInto<RedisMap> + Debug,
V::Error: Into<fred::error::RedisError>,
{
self.pool
.hset(key, values)
.await
.into_report()
.change_context(errors::RedisError::SetHashFailed)
}
#[instrument(level = "DEBUG", skip(self))]
pub async fn serialize_and_set_hash_fields<V>(
&self,
key: &str,
values: V,
) -> CustomResult<(), errors::RedisError>
where
V: serde::Serialize + Debug,
{
let serialized = Encode::<V>::encode_to_value(&values)
.change_context(errors::RedisError::JsonSerializationFailed)?;
self.set_hash_fields(key, serialized).await
}
#[instrument(level = "DEBUG", skip(self))]
pub async fn set_hash_field_if_not_exist<V>(
&self,
key: &str,
field: &str,
value: V,
) -> CustomResult<SetNXReply, errors::RedisError>
where
V: TryInto<RedisValue> + Debug,
V::Error: Into<fred::error::RedisError>,
{
self.pool
.hsetnx(key, field, value)
.await
.into_report()
.change_context(errors::RedisError::SetHashFieldFailed)
}
#[instrument(level = "DEBUG", skip(self))]
pub async fn serialize_and_set_hash_field_if_not_exist<V>(
&self,
key: &str,
field: &str,
value: V,
) -> CustomResult<SetNXReply, errors::RedisError>
where
V: serde::Serialize + Debug,
{
let serialized = Encode::<V>::encode_to_vec(&value)
.change_context(errors::RedisError::JsonSerializationFailed)?;
self.set_hash_field_if_not_exist(key, field, &serialized as &[u8])
.await
}
#[instrument(level = "DEBUG", skip(self))]
pub async fn get_hash_field<V>(
&self,
key: &str,
field: &str,
) -> CustomResult<V, errors::RedisError>
where
V: FromRedis + Unpin + Send + 'static,
{
self.pool
.hget(key, field)
.await
.into_report()
.change_context(errors::RedisError::GetHashFieldFailed)
}
#[instrument(level = "DEBUG", skip(self))]
pub async fn get_hash_field_and_deserialize<V>(
&self,
key: &str,
field: &str,
type_name: &str,
) -> CustomResult<V, errors::RedisError>
where
V: serde::de::DeserializeOwned,
{
let value_bytes = self.get_hash_field::<Vec<u8>>(key, field).await?;
if value_bytes.is_empty() {
return Err(errors::RedisError::NotFound.into());
}
value_bytes
.parse_struct(type_name)
.change_context(errors::RedisError::JsonDeserializationFailed)
}
#[instrument(level = "DEBUG", skip(self))]
pub async fn stream_append_entry<F>(
&self,