mirror of
https://github.com/juspay/hyperswitch.git
synced 2025-10-27 19:46:48 +08:00
fix(multitenancy): add a fallback for get commands in redis (#7043)
This commit is contained in:
@ -17,8 +17,7 @@ use fred::{
|
||||
prelude::{LuaInterface, RedisErrorKind},
|
||||
types::{
|
||||
Expiration, FromRedis, MultipleIDs, MultipleKeys, MultipleOrderedPairs, MultipleStrings,
|
||||
MultipleValues, RedisKey, RedisMap, RedisValue, ScanType, Scanner, SetOptions, XCap,
|
||||
XReadResponse,
|
||||
MultipleValues, RedisMap, RedisValue, ScanType, Scanner, SetOptions, XCap, XReadResponse,
|
||||
},
|
||||
};
|
||||
use futures::StreamExt;
|
||||
@ -26,7 +25,7 @@ use tracing::instrument;
|
||||
|
||||
use crate::{
|
||||
errors,
|
||||
types::{DelReply, HsetnxReply, MsetnxReply, RedisEntryId, SaddReply, SetnxReply},
|
||||
types::{DelReply, HsetnxReply, MsetnxReply, RedisEntryId, RedisKey, SaddReply, SetnxReply},
|
||||
};
|
||||
|
||||
impl super::RedisConnectionPool {
|
||||
@ -37,15 +36,16 @@ impl super::RedisConnectionPool {
|
||||
format!("{}:{}", self.key_prefix, key)
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(level = "DEBUG", skip(self))]
|
||||
pub async fn set_key<V>(&self, key: &str, value: V) -> CustomResult<(), errors::RedisError>
|
||||
pub async fn set_key<V>(&self, key: &RedisKey, value: V) -> CustomResult<(), errors::RedisError>
|
||||
where
|
||||
V: TryInto<RedisValue> + Debug + Send + Sync,
|
||||
V::Error: Into<fred::error::RedisError> + Send + Sync,
|
||||
{
|
||||
self.pool
|
||||
.set(
|
||||
self.add_prefix(key),
|
||||
key.tenant_aware_key(self),
|
||||
value,
|
||||
Some(Expiration::EX(self.config.default_ttl.into())),
|
||||
None,
|
||||
@ -57,7 +57,7 @@ impl super::RedisConnectionPool {
|
||||
|
||||
pub async fn set_key_without_modifying_ttl<V>(
|
||||
&self,
|
||||
key: &str,
|
||||
key: &RedisKey,
|
||||
value: V,
|
||||
) -> CustomResult<(), errors::RedisError>
|
||||
where
|
||||
@ -65,7 +65,13 @@ impl super::RedisConnectionPool {
|
||||
V::Error: Into<fred::error::RedisError> + Send + Sync,
|
||||
{
|
||||
self.pool
|
||||
.set(key, value, Some(Expiration::KEEPTTL), None, false)
|
||||
.set(
|
||||
key.tenant_aware_key(self),
|
||||
value,
|
||||
Some(Expiration::KEEPTTL),
|
||||
None,
|
||||
false,
|
||||
)
|
||||
.await
|
||||
.change_context(errors::RedisError::SetFailed)
|
||||
}
|
||||
@ -87,7 +93,7 @@ impl super::RedisConnectionPool {
|
||||
#[instrument(level = "DEBUG", skip(self))]
|
||||
pub async fn serialize_and_set_key_if_not_exist<V>(
|
||||
&self,
|
||||
key: &str,
|
||||
key: &RedisKey,
|
||||
value: V,
|
||||
ttl: Option<i64>,
|
||||
) -> CustomResult<SetnxReply, errors::RedisError>
|
||||
@ -104,7 +110,7 @@ impl super::RedisConnectionPool {
|
||||
#[instrument(level = "DEBUG", skip(self))]
|
||||
pub async fn serialize_and_set_key<V>(
|
||||
&self,
|
||||
key: &str,
|
||||
key: &RedisKey,
|
||||
value: V,
|
||||
) -> CustomResult<(), errors::RedisError>
|
||||
where
|
||||
@ -120,7 +126,7 @@ impl super::RedisConnectionPool {
|
||||
#[instrument(level = "DEBUG", skip(self))]
|
||||
pub async fn serialize_and_set_key_without_modifying_ttl<V>(
|
||||
&self,
|
||||
key: &str,
|
||||
key: &RedisKey,
|
||||
value: V,
|
||||
) -> CustomResult<(), errors::RedisError>
|
||||
where
|
||||
@ -137,7 +143,7 @@ impl super::RedisConnectionPool {
|
||||
#[instrument(level = "DEBUG", skip(self))]
|
||||
pub async fn serialize_and_set_key_with_expiry<V>(
|
||||
&self,
|
||||
key: &str,
|
||||
key: &RedisKey,
|
||||
value: V,
|
||||
seconds: i64,
|
||||
) -> CustomResult<(), errors::RedisError>
|
||||
@ -150,7 +156,7 @@ impl super::RedisConnectionPool {
|
||||
|
||||
self.pool
|
||||
.set(
|
||||
self.add_prefix(key),
|
||||
key.tenant_aware_key(self),
|
||||
serialized.as_slice(),
|
||||
Some(Expiration::EX(seconds)),
|
||||
None,
|
||||
@ -161,31 +167,67 @@ impl super::RedisConnectionPool {
|
||||
}
|
||||
|
||||
#[instrument(level = "DEBUG", skip(self))]
|
||||
pub async fn get_key<V>(&self, key: &str) -> CustomResult<V, errors::RedisError>
|
||||
pub async fn get_key<V>(&self, key: &RedisKey) -> CustomResult<V, errors::RedisError>
|
||||
where
|
||||
V: FromRedis + Unpin + Send + 'static,
|
||||
{
|
||||
self.pool
|
||||
.get(self.add_prefix(key))
|
||||
match self
|
||||
.pool
|
||||
.get(key.tenant_aware_key(self))
|
||||
.await
|
||||
.change_context(errors::RedisError::GetFailed)
|
||||
{
|
||||
Ok(v) => Ok(v),
|
||||
Err(_err) => {
|
||||
#[cfg(not(feature = "multitenancy_fallback"))]
|
||||
{
|
||||
Err(_err)
|
||||
}
|
||||
|
||||
#[cfg(feature = "multitenancy_fallback")]
|
||||
{
|
||||
self.pool
|
||||
.get(key.tenant_unaware_key(self))
|
||||
.await
|
||||
.change_context(errors::RedisError::GetFailed)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(level = "DEBUG", skip(self))]
|
||||
pub async fn exists<V>(&self, key: &str) -> CustomResult<bool, errors::RedisError>
|
||||
pub async fn exists<V>(&self, key: &RedisKey) -> CustomResult<bool, errors::RedisError>
|
||||
where
|
||||
V: Into<MultipleKeys> + Unpin + Send + 'static,
|
||||
{
|
||||
self.pool
|
||||
.exists(self.add_prefix(key))
|
||||
match self
|
||||
.pool
|
||||
.exists(key.tenant_aware_key(self))
|
||||
.await
|
||||
.change_context(errors::RedisError::GetFailed)
|
||||
{
|
||||
Ok(v) => Ok(v),
|
||||
Err(_err) => {
|
||||
#[cfg(not(feature = "multitenancy_fallback"))]
|
||||
{
|
||||
Err(_err)
|
||||
}
|
||||
|
||||
#[cfg(feature = "multitenancy_fallback")]
|
||||
{
|
||||
self.pool
|
||||
.exists(key.tenant_unaware_key(self))
|
||||
.await
|
||||
.change_context(errors::RedisError::GetFailed)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(level = "DEBUG", skip(self))]
|
||||
pub async fn get_and_deserialize_key<T>(
|
||||
&self,
|
||||
key: &str,
|
||||
key: &RedisKey,
|
||||
type_name: &'static str,
|
||||
) -> CustomResult<T, errors::RedisError>
|
||||
where
|
||||
@ -201,19 +243,37 @@ impl super::RedisConnectionPool {
|
||||
}
|
||||
|
||||
#[instrument(level = "DEBUG", skip(self))]
|
||||
pub async fn delete_key(&self, key: &str) -> CustomResult<DelReply, errors::RedisError> {
|
||||
self.pool
|
||||
.del(self.add_prefix(key))
|
||||
pub async fn delete_key(&self, key: &RedisKey) -> CustomResult<DelReply, errors::RedisError> {
|
||||
match self
|
||||
.pool
|
||||
.del(key.tenant_aware_key(self))
|
||||
.await
|
||||
.change_context(errors::RedisError::DeleteFailed)
|
||||
{
|
||||
Ok(v) => Ok(v),
|
||||
Err(_err) => {
|
||||
#[cfg(not(feature = "multitenancy_fallback"))]
|
||||
{
|
||||
Err(_err)
|
||||
}
|
||||
|
||||
#[cfg(feature = "multitenancy_fallback")]
|
||||
{
|
||||
self.pool
|
||||
.del(key.tenant_unaware_key(self))
|
||||
.await
|
||||
.change_context(errors::RedisError::DeleteFailed)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(level = "DEBUG", skip(self))]
|
||||
pub async fn delete_multiple_keys(
|
||||
&self,
|
||||
keys: &[String],
|
||||
keys: &[RedisKey],
|
||||
) -> CustomResult<Vec<DelReply>, errors::RedisError> {
|
||||
let futures = keys.iter().map(|key| self.pool.del(self.add_prefix(key)));
|
||||
let futures = keys.iter().map(|key| self.delete_key(key));
|
||||
|
||||
let del_result = futures::future::try_join_all(futures)
|
||||
.await
|
||||
@ -225,7 +285,7 @@ impl super::RedisConnectionPool {
|
||||
#[instrument(level = "DEBUG", skip(self))]
|
||||
pub async fn set_key_with_expiry<V>(
|
||||
&self,
|
||||
key: &str,
|
||||
key: &RedisKey,
|
||||
value: V,
|
||||
seconds: i64,
|
||||
) -> CustomResult<(), errors::RedisError>
|
||||
@ -235,7 +295,7 @@ impl super::RedisConnectionPool {
|
||||
{
|
||||
self.pool
|
||||
.set(
|
||||
self.add_prefix(key),
|
||||
key.tenant_aware_key(self),
|
||||
value,
|
||||
Some(Expiration::EX(seconds)),
|
||||
None,
|
||||
@ -248,7 +308,7 @@ impl super::RedisConnectionPool {
|
||||
#[instrument(level = "DEBUG", skip(self))]
|
||||
pub async fn set_key_if_not_exists_with_expiry<V>(
|
||||
&self,
|
||||
key: &str,
|
||||
key: &RedisKey,
|
||||
value: V,
|
||||
seconds: Option<i64>,
|
||||
) -> CustomResult<SetnxReply, errors::RedisError>
|
||||
@ -258,7 +318,7 @@ impl super::RedisConnectionPool {
|
||||
{
|
||||
self.pool
|
||||
.set(
|
||||
self.add_prefix(key),
|
||||
key.tenant_aware_key(self),
|
||||
value,
|
||||
Some(Expiration::EX(
|
||||
seconds.unwrap_or(self.config.default_ttl.into()),
|
||||
@ -273,11 +333,11 @@ impl super::RedisConnectionPool {
|
||||
#[instrument(level = "DEBUG", skip(self))]
|
||||
pub async fn set_expiry(
|
||||
&self,
|
||||
key: &str,
|
||||
key: &RedisKey,
|
||||
seconds: i64,
|
||||
) -> CustomResult<(), errors::RedisError> {
|
||||
self.pool
|
||||
.expire(self.add_prefix(key), seconds)
|
||||
.expire(key.tenant_aware_key(self), seconds)
|
||||
.await
|
||||
.change_context(errors::RedisError::SetExpiryFailed)
|
||||
}
|
||||
@ -285,11 +345,11 @@ impl super::RedisConnectionPool {
|
||||
#[instrument(level = "DEBUG", skip(self))]
|
||||
pub async fn set_expire_at(
|
||||
&self,
|
||||
key: &str,
|
||||
key: &RedisKey,
|
||||
timestamp: i64,
|
||||
) -> CustomResult<(), errors::RedisError> {
|
||||
self.pool
|
||||
.expire_at(self.add_prefix(key), timestamp)
|
||||
.expire_at(key.tenant_aware_key(self), timestamp)
|
||||
.await
|
||||
.change_context(errors::RedisError::SetExpiryFailed)
|
||||
}
|
||||
@ -297,7 +357,7 @@ impl super::RedisConnectionPool {
|
||||
#[instrument(level = "DEBUG", skip(self))]
|
||||
pub async fn set_hash_fields<V>(
|
||||
&self,
|
||||
key: &str,
|
||||
key: &RedisKey,
|
||||
values: V,
|
||||
ttl: Option<i64>,
|
||||
) -> CustomResult<(), errors::RedisError>
|
||||
@ -307,7 +367,7 @@ impl super::RedisConnectionPool {
|
||||
{
|
||||
let output: Result<(), _> = self
|
||||
.pool
|
||||
.hset(self.add_prefix(key), values)
|
||||
.hset(key.tenant_aware_key(self), values)
|
||||
.await
|
||||
.change_context(errors::RedisError::SetHashFailed);
|
||||
// setting expiry for the key
|
||||
@ -321,7 +381,7 @@ impl super::RedisConnectionPool {
|
||||
#[instrument(level = "DEBUG", skip(self))]
|
||||
pub async fn set_hash_field_if_not_exist<V>(
|
||||
&self,
|
||||
key: &str,
|
||||
key: &RedisKey,
|
||||
field: &str,
|
||||
value: V,
|
||||
ttl: Option<u32>,
|
||||
@ -332,7 +392,7 @@ impl super::RedisConnectionPool {
|
||||
{
|
||||
let output: Result<HsetnxReply, _> = self
|
||||
.pool
|
||||
.hsetnx(self.add_prefix(key), field, value)
|
||||
.hsetnx(key.tenant_aware_key(self), field, value)
|
||||
.await
|
||||
.change_context(errors::RedisError::SetHashFieldFailed);
|
||||
|
||||
@ -348,7 +408,7 @@ impl super::RedisConnectionPool {
|
||||
#[instrument(level = "DEBUG", skip(self))]
|
||||
pub async fn serialize_and_set_hash_field_if_not_exist<V>(
|
||||
&self,
|
||||
key: &str,
|
||||
key: &RedisKey,
|
||||
field: &str,
|
||||
value: V,
|
||||
ttl: Option<u32>,
|
||||
@ -367,7 +427,7 @@ impl super::RedisConnectionPool {
|
||||
#[instrument(level = "DEBUG", skip(self))]
|
||||
pub async fn serialize_and_set_multiple_hash_field_if_not_exist<V>(
|
||||
&self,
|
||||
kv: &[(&str, V)],
|
||||
kv: &[(&RedisKey, V)],
|
||||
field: &str,
|
||||
ttl: Option<u32>,
|
||||
) -> CustomResult<Vec<HsetnxReply>, errors::RedisError>
|
||||
@ -387,7 +447,7 @@ impl super::RedisConnectionPool {
|
||||
#[instrument(level = "DEBUG", skip(self))]
|
||||
pub async fn increment_fields_in_hash<T>(
|
||||
&self,
|
||||
key: &str,
|
||||
key: &RedisKey,
|
||||
fields_to_increment: &[(T, i64)],
|
||||
) -> CustomResult<Vec<usize>, errors::RedisError>
|
||||
where
|
||||
@ -397,7 +457,7 @@ impl super::RedisConnectionPool {
|
||||
for (field, increment) in fields_to_increment.iter() {
|
||||
values_after_increment.push(
|
||||
self.pool
|
||||
.hincrby(self.add_prefix(key), field.to_string(), *increment)
|
||||
.hincrby(key.tenant_aware_key(self), field.to_string(), *increment)
|
||||
.await
|
||||
.change_context(errors::RedisError::IncrementHashFieldFailed)?,
|
||||
)
|
||||
@ -409,14 +469,14 @@ impl super::RedisConnectionPool {
|
||||
#[instrument(level = "DEBUG", skip(self))]
|
||||
pub async fn hscan(
|
||||
&self,
|
||||
key: &str,
|
||||
key: &RedisKey,
|
||||
pattern: &str,
|
||||
count: Option<u32>,
|
||||
) -> CustomResult<Vec<String>, errors::RedisError> {
|
||||
Ok(self
|
||||
.pool
|
||||
.next()
|
||||
.hscan::<&str, &str>(&self.add_prefix(key), pattern, count)
|
||||
.hscan::<&str, &str>(&key.tenant_aware_key(self), pattern, count)
|
||||
.filter_map(|value| async move {
|
||||
match value {
|
||||
Ok(mut v) => {
|
||||
@ -440,14 +500,14 @@ impl super::RedisConnectionPool {
|
||||
#[instrument(level = "DEBUG", skip(self))]
|
||||
pub async fn scan(
|
||||
&self,
|
||||
pattern: &str,
|
||||
pattern: &RedisKey,
|
||||
count: Option<u32>,
|
||||
scan_type: Option<ScanType>,
|
||||
) -> CustomResult<Vec<String>, errors::RedisError> {
|
||||
Ok(self
|
||||
.pool
|
||||
.next()
|
||||
.scan(&self.add_prefix(pattern), count, scan_type)
|
||||
.scan(pattern.tenant_aware_key(self), count, scan_type)
|
||||
.filter_map(|value| async move {
|
||||
match value {
|
||||
Ok(mut v) => {
|
||||
@ -471,7 +531,7 @@ impl super::RedisConnectionPool {
|
||||
#[instrument(level = "DEBUG", skip(self))]
|
||||
pub async fn hscan_and_deserialize<T>(
|
||||
&self,
|
||||
key: &str,
|
||||
key: &RedisKey,
|
||||
pattern: &str,
|
||||
count: Option<u32>,
|
||||
) -> CustomResult<Vec<T>, errors::RedisError>
|
||||
@ -491,33 +551,69 @@ impl super::RedisConnectionPool {
|
||||
#[instrument(level = "DEBUG", skip(self))]
|
||||
pub async fn get_hash_field<V>(
|
||||
&self,
|
||||
key: &str,
|
||||
key: &RedisKey,
|
||||
field: &str,
|
||||
) -> CustomResult<V, errors::RedisError>
|
||||
where
|
||||
V: FromRedis + Unpin + Send + 'static,
|
||||
{
|
||||
self.pool
|
||||
.hget(self.add_prefix(key), field)
|
||||
match self
|
||||
.pool
|
||||
.hget(key.tenant_aware_key(self), field)
|
||||
.await
|
||||
.change_context(errors::RedisError::GetHashFieldFailed)
|
||||
{
|
||||
Ok(v) => Ok(v),
|
||||
Err(_err) => {
|
||||
#[cfg(feature = "multitenancy_fallback")]
|
||||
{
|
||||
self.pool
|
||||
.hget(key.tenant_unaware_key(self), field)
|
||||
.await
|
||||
.change_context(errors::RedisError::GetHashFieldFailed)
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "multitenancy_fallback"))]
|
||||
{
|
||||
Err(_err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(level = "DEBUG", skip(self))]
|
||||
pub async fn get_hash_fields<V>(&self, key: &str) -> CustomResult<V, errors::RedisError>
|
||||
pub async fn get_hash_fields<V>(&self, key: &RedisKey) -> CustomResult<V, errors::RedisError>
|
||||
where
|
||||
V: FromRedis + Unpin + Send + 'static,
|
||||
{
|
||||
self.pool
|
||||
.hgetall(self.add_prefix(key))
|
||||
match self
|
||||
.pool
|
||||
.hgetall(key.tenant_aware_key(self))
|
||||
.await
|
||||
.change_context(errors::RedisError::GetHashFieldFailed)
|
||||
{
|
||||
Ok(v) => Ok(v),
|
||||
Err(_err) => {
|
||||
#[cfg(feature = "multitenancy_fallback")]
|
||||
{
|
||||
self.pool
|
||||
.hgetall(key.tenant_unaware_key(self))
|
||||
.await
|
||||
.change_context(errors::RedisError::GetHashFieldFailed)
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "multitenancy_fallback"))]
|
||||
{
|
||||
Err(_err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(level = "DEBUG", skip(self))]
|
||||
pub async fn get_hash_field_and_deserialize<V>(
|
||||
&self,
|
||||
key: &str,
|
||||
key: &RedisKey,
|
||||
field: &str,
|
||||
type_name: &'static str,
|
||||
) -> CustomResult<V, errors::RedisError>
|
||||
@ -538,7 +634,7 @@ impl super::RedisConnectionPool {
|
||||
#[instrument(level = "DEBUG", skip(self))]
|
||||
pub async fn sadd<V>(
|
||||
&self,
|
||||
key: &str,
|
||||
key: &RedisKey,
|
||||
members: V,
|
||||
) -> CustomResult<SaddReply, errors::RedisError>
|
||||
where
|
||||
@ -546,7 +642,7 @@ impl super::RedisConnectionPool {
|
||||
V::Error: Into<fred::error::RedisError> + Send,
|
||||
{
|
||||
self.pool
|
||||
.sadd(self.add_prefix(key), members)
|
||||
.sadd(key.tenant_aware_key(self), members)
|
||||
.await
|
||||
.change_context(errors::RedisError::SetAddMembersFailed)
|
||||
}
|
||||
@ -554,7 +650,7 @@ impl super::RedisConnectionPool {
|
||||
#[instrument(level = "DEBUG", skip(self))]
|
||||
pub async fn stream_append_entry<F>(
|
||||
&self,
|
||||
stream: &str,
|
||||
stream: &RedisKey,
|
||||
entry_id: &RedisEntryId,
|
||||
fields: F,
|
||||
) -> CustomResult<(), errors::RedisError>
|
||||
@ -563,7 +659,7 @@ impl super::RedisConnectionPool {
|
||||
F::Error: Into<fred::error::RedisError> + Send + Sync,
|
||||
{
|
||||
self.pool
|
||||
.xadd(self.add_prefix(stream), false, None, entry_id, fields)
|
||||
.xadd(stream.tenant_aware_key(self), false, None, entry_id, fields)
|
||||
.await
|
||||
.change_context(errors::RedisError::StreamAppendFailed)
|
||||
}
|
||||
@ -571,14 +667,14 @@ impl super::RedisConnectionPool {
|
||||
#[instrument(level = "DEBUG", skip(self))]
|
||||
pub async fn stream_delete_entries<Ids>(
|
||||
&self,
|
||||
stream: &str,
|
||||
stream: &RedisKey,
|
||||
ids: Ids,
|
||||
) -> CustomResult<usize, errors::RedisError>
|
||||
where
|
||||
Ids: Into<MultipleStrings> + Debug + Send + Sync,
|
||||
{
|
||||
self.pool
|
||||
.xdel(self.add_prefix(stream), ids)
|
||||
.xdel(stream.tenant_aware_key(self), ids)
|
||||
.await
|
||||
.change_context(errors::RedisError::StreamDeleteFailed)
|
||||
}
|
||||
@ -586,7 +682,7 @@ impl super::RedisConnectionPool {
|
||||
#[instrument(level = "DEBUG", skip(self))]
|
||||
pub async fn stream_trim_entries<C>(
|
||||
&self,
|
||||
stream: &str,
|
||||
stream: &RedisKey,
|
||||
xcap: C,
|
||||
) -> CustomResult<usize, errors::RedisError>
|
||||
where
|
||||
@ -594,7 +690,7 @@ impl super::RedisConnectionPool {
|
||||
C::Error: Into<fred::error::RedisError> + Send + Sync,
|
||||
{
|
||||
self.pool
|
||||
.xtrim(self.add_prefix(stream), xcap)
|
||||
.xtrim(stream.tenant_aware_key(self), xcap)
|
||||
.await
|
||||
.change_context(errors::RedisError::StreamTrimFailed)
|
||||
}
|
||||
@ -602,7 +698,7 @@ impl super::RedisConnectionPool {
|
||||
#[instrument(level = "DEBUG", skip(self))]
|
||||
pub async fn stream_acknowledge_entries<Ids>(
|
||||
&self,
|
||||
stream: &str,
|
||||
stream: &RedisKey,
|
||||
group: &str,
|
||||
ids: Ids,
|
||||
) -> CustomResult<usize, errors::RedisError>
|
||||
@ -610,15 +706,18 @@ impl super::RedisConnectionPool {
|
||||
Ids: Into<MultipleIDs> + Debug + Send + Sync,
|
||||
{
|
||||
self.pool
|
||||
.xack(self.add_prefix(stream), group, ids)
|
||||
.xack(stream.tenant_aware_key(self), group, ids)
|
||||
.await
|
||||
.change_context(errors::RedisError::StreamAcknowledgeFailed)
|
||||
}
|
||||
|
||||
#[instrument(level = "DEBUG", skip(self))]
|
||||
pub async fn stream_get_length(&self, stream: &str) -> CustomResult<usize, errors::RedisError> {
|
||||
pub async fn stream_get_length(
|
||||
&self,
|
||||
stream: &RedisKey,
|
||||
) -> CustomResult<usize, errors::RedisError> {
|
||||
self.pool
|
||||
.xlen(self.add_prefix(stream))
|
||||
.xlen(stream.tenant_aware_key(self))
|
||||
.await
|
||||
.change_context(errors::RedisError::GetLengthFailed)
|
||||
}
|
||||
@ -631,10 +730,10 @@ impl super::RedisConnectionPool {
|
||||
let res = multiple_keys
|
||||
.inner()
|
||||
.iter()
|
||||
.filter_map(|key| key.as_str())
|
||||
.map(|k| self.add_prefix(k))
|
||||
.map(RedisKey::from)
|
||||
.filter_map(|key| key.as_str().map(RedisKey::from))
|
||||
.map(|k: RedisKey| k.tenant_aware_key(self))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
MultipleKeys::from(res)
|
||||
}
|
||||
|
||||
@ -710,7 +809,7 @@ impl super::RedisConnectionPool {
|
||||
#[instrument(level = "DEBUG", skip(self))]
|
||||
pub async fn append_elements_to_list<V>(
|
||||
&self,
|
||||
key: &str,
|
||||
key: &RedisKey,
|
||||
elements: V,
|
||||
) -> CustomResult<(), errors::RedisError>
|
||||
where
|
||||
@ -718,7 +817,7 @@ impl super::RedisConnectionPool {
|
||||
V::Error: Into<fred::error::RedisError> + Send,
|
||||
{
|
||||
self.pool
|
||||
.rpush(self.add_prefix(key), elements)
|
||||
.rpush(key.tenant_aware_key(self), elements)
|
||||
.await
|
||||
.change_context(errors::RedisError::AppendElementsToListFailed)
|
||||
}
|
||||
@ -726,20 +825,20 @@ impl super::RedisConnectionPool {
|
||||
#[instrument(level = "DEBUG", skip(self))]
|
||||
pub async fn get_list_elements(
|
||||
&self,
|
||||
key: &str,
|
||||
key: &RedisKey,
|
||||
start: i64,
|
||||
stop: i64,
|
||||
) -> CustomResult<Vec<String>, errors::RedisError> {
|
||||
self.pool
|
||||
.lrange(self.add_prefix(key), start, stop)
|
||||
.lrange(key.tenant_aware_key(self), start, stop)
|
||||
.await
|
||||
.change_context(errors::RedisError::GetListElementsFailed)
|
||||
}
|
||||
|
||||
#[instrument(level = "DEBUG", skip(self))]
|
||||
pub async fn get_list_length(&self, key: &str) -> CustomResult<usize, errors::RedisError> {
|
||||
pub async fn get_list_length(&self, key: &RedisKey) -> CustomResult<usize, errors::RedisError> {
|
||||
self.pool
|
||||
.llen(self.add_prefix(key))
|
||||
.llen(key.tenant_aware_key(self))
|
||||
.await
|
||||
.change_context(errors::RedisError::GetListLengthFailed)
|
||||
}
|
||||
@ -747,11 +846,11 @@ impl super::RedisConnectionPool {
|
||||
#[instrument(level = "DEBUG", skip(self))]
|
||||
pub async fn lpop_list_elements(
|
||||
&self,
|
||||
key: &str,
|
||||
key: &RedisKey,
|
||||
count: Option<usize>,
|
||||
) -> CustomResult<Vec<String>, errors::RedisError> {
|
||||
self.pool
|
||||
.lpop(self.add_prefix(key), count)
|
||||
.lpop(key.tenant_aware_key(self), count)
|
||||
.await
|
||||
.change_context(errors::RedisError::PopListElementsFailed)
|
||||
}
|
||||
@ -761,7 +860,7 @@ impl super::RedisConnectionPool {
|
||||
#[instrument(level = "DEBUG", skip(self))]
|
||||
pub async fn consumer_group_create(
|
||||
&self,
|
||||
stream: &str,
|
||||
stream: &RedisKey,
|
||||
group: &str,
|
||||
id: &RedisEntryId,
|
||||
) -> CustomResult<(), errors::RedisError> {
|
||||
@ -774,7 +873,7 @@ impl super::RedisConnectionPool {
|
||||
}
|
||||
|
||||
self.pool
|
||||
.xgroup_create(self.add_prefix(stream), group, id, true)
|
||||
.xgroup_create(stream.tenant_aware_key(self), group, id, true)
|
||||
.await
|
||||
.change_context(errors::RedisError::ConsumerGroupCreateFailed)
|
||||
}
|
||||
@ -782,11 +881,11 @@ impl super::RedisConnectionPool {
|
||||
#[instrument(level = "DEBUG", skip(self))]
|
||||
pub async fn consumer_group_destroy(
|
||||
&self,
|
||||
stream: &str,
|
||||
stream: &RedisKey,
|
||||
group: &str,
|
||||
) -> CustomResult<usize, errors::RedisError> {
|
||||
self.pool
|
||||
.xgroup_destroy(self.add_prefix(stream), group)
|
||||
.xgroup_destroy(stream.tenant_aware_key(self), group)
|
||||
.await
|
||||
.change_context(errors::RedisError::ConsumerGroupDestroyFailed)
|
||||
}
|
||||
@ -795,12 +894,12 @@ impl super::RedisConnectionPool {
|
||||
#[instrument(level = "DEBUG", skip(self))]
|
||||
pub async fn consumer_group_delete_consumer(
|
||||
&self,
|
||||
stream: &str,
|
||||
stream: &RedisKey,
|
||||
group: &str,
|
||||
consumer: &str,
|
||||
) -> CustomResult<usize, errors::RedisError> {
|
||||
self.pool
|
||||
.xgroup_delconsumer(self.add_prefix(stream), group, consumer)
|
||||
.xgroup_delconsumer(stream.tenant_aware_key(self), group, consumer)
|
||||
.await
|
||||
.change_context(errors::RedisError::ConsumerGroupRemoveConsumerFailed)
|
||||
}
|
||||
@ -808,12 +907,12 @@ impl super::RedisConnectionPool {
|
||||
#[instrument(level = "DEBUG", skip(self))]
|
||||
pub async fn consumer_group_set_last_id(
|
||||
&self,
|
||||
stream: &str,
|
||||
stream: &RedisKey,
|
||||
group: &str,
|
||||
id: &RedisEntryId,
|
||||
) -> CustomResult<String, errors::RedisError> {
|
||||
self.pool
|
||||
.xgroup_setid(self.add_prefix(stream), group, id)
|
||||
.xgroup_setid(stream.tenant_aware_key(self), group, id)
|
||||
.await
|
||||
.change_context(errors::RedisError::ConsumerGroupSetIdFailed)
|
||||
}
|
||||
@ -821,7 +920,7 @@ impl super::RedisConnectionPool {
|
||||
#[instrument(level = "DEBUG", skip(self))]
|
||||
pub async fn consumer_group_set_message_owner<Ids, R>(
|
||||
&self,
|
||||
stream: &str,
|
||||
stream: &RedisKey,
|
||||
group: &str,
|
||||
consumer: &str,
|
||||
min_idle_time: u64,
|
||||
@ -833,7 +932,7 @@ impl super::RedisConnectionPool {
|
||||
{
|
||||
self.pool
|
||||
.xclaim(
|
||||
self.add_prefix(stream),
|
||||
stream.tenant_aware_key(self),
|
||||
group,
|
||||
consumer,
|
||||
min_idle_time,
|
||||
@ -888,11 +987,15 @@ mod tests {
|
||||
|
||||
// Act
|
||||
let result1 = redis_conn
|
||||
.consumer_group_create("TEST1", "GTEST", &RedisEntryId::AutoGeneratedID)
|
||||
.consumer_group_create(&"TEST1".into(), "GTEST", &RedisEntryId::AutoGeneratedID)
|
||||
.await;
|
||||
|
||||
let result2 = redis_conn
|
||||
.consumer_group_create("TEST3", "GTEST", &RedisEntryId::UndeliveredEntryID)
|
||||
.consumer_group_create(
|
||||
&"TEST3".into(),
|
||||
"GTEST",
|
||||
&RedisEntryId::UndeliveredEntryID,
|
||||
)
|
||||
.await;
|
||||
|
||||
// Assert Setup
|
||||
@ -914,10 +1017,10 @@ mod tests {
|
||||
let pool = RedisConnectionPool::new(&RedisSettings::default())
|
||||
.await
|
||||
.expect("failed to create redis connection pool");
|
||||
let _ = pool.set_key("key", "value".to_string()).await;
|
||||
let _ = pool.set_key(&"key".into(), "value".to_string()).await;
|
||||
|
||||
// Act
|
||||
let result = pool.delete_key("key").await;
|
||||
let result = pool.delete_key(&"key".into()).await;
|
||||
|
||||
// Assert setup
|
||||
result.is_ok()
|
||||
@ -938,7 +1041,7 @@ mod tests {
|
||||
.expect("failed to create redis connection pool");
|
||||
|
||||
// Act
|
||||
let result = pool.delete_key("key not exists").await;
|
||||
let result = pool.delete_key(&"key not exists".into()).await;
|
||||
|
||||
// Assert Setup
|
||||
result.is_ok()
|
||||
|
||||
@ -4,7 +4,7 @@
|
||||
use common_utils::errors::CustomResult;
|
||||
use fred::types::RedisValue as FredRedisValue;
|
||||
|
||||
use crate::errors;
|
||||
use crate::{errors, RedisConnectionPool};
|
||||
|
||||
pub struct RedisValue {
|
||||
inner: FredRedisValue,
|
||||
@ -293,3 +293,24 @@ impl fred::types::FromRedis for SaddReply {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct RedisKey(String);
|
||||
|
||||
impl RedisKey {
|
||||
pub fn tenant_aware_key(&self, pool: &RedisConnectionPool) -> String {
|
||||
pool.add_prefix(&self.0)
|
||||
}
|
||||
|
||||
pub fn tenant_unaware_key(&self, _pool: &RedisConnectionPool) -> String {
|
||||
self.0.clone()
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: AsRef<str>> From<T> for RedisKey {
|
||||
fn from(value: T) -> Self {
|
||||
let value = value.as_ref();
|
||||
|
||||
Self(value.to_string())
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user