mirror of
https://github.com/juspay/hyperswitch.git
synced 2025-11-02 21:07:58 +08:00
fix(redis): add support for fetching multiple keys for redis cluster (#9019)
This commit is contained in:
@ -199,7 +199,7 @@ impl super::RedisConnectionPool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[instrument(level = "DEBUG", skip(self))]
|
#[instrument(level = "DEBUG", skip(self))]
|
||||||
pub async fn get_multiple_keys<V>(
|
async fn get_multiple_keys_with_mget<V>(
|
||||||
&self,
|
&self,
|
||||||
keys: &[RedisKey],
|
keys: &[RedisKey],
|
||||||
) -> CustomResult<Vec<Option<V>>, errors::RedisError>
|
) -> CustomResult<Vec<Option<V>>, errors::RedisError>
|
||||||
@ -212,13 +212,69 @@ impl super::RedisConnectionPool {
|
|||||||
|
|
||||||
let tenant_aware_keys: Vec<String> =
|
let tenant_aware_keys: Vec<String> =
|
||||||
keys.iter().map(|key| key.tenant_aware_key(self)).collect();
|
keys.iter().map(|key| key.tenant_aware_key(self)).collect();
|
||||||
|
self.pool
|
||||||
match self
|
.mget(tenant_aware_keys)
|
||||||
.pool
|
|
||||||
.mget(tenant_aware_keys.clone())
|
|
||||||
.await
|
.await
|
||||||
.change_context(errors::RedisError::GetFailed)
|
.change_context(errors::RedisError::GetFailed)
|
||||||
{
|
}
|
||||||
|
|
||||||
|
#[instrument(level = "DEBUG", skip(self))]
|
||||||
|
async fn get_multiple_keys_with_parallel_get<V>(
|
||||||
|
&self,
|
||||||
|
keys: &[RedisKey],
|
||||||
|
) -> CustomResult<Vec<Option<V>>, errors::RedisError>
|
||||||
|
where
|
||||||
|
V: FromRedis + Unpin + Send + 'static,
|
||||||
|
{
|
||||||
|
if keys.is_empty() {
|
||||||
|
return Ok(Vec::new());
|
||||||
|
}
|
||||||
|
let tenant_aware_keys: Vec<String> =
|
||||||
|
keys.iter().map(|key| key.tenant_aware_key(self)).collect();
|
||||||
|
|
||||||
|
let futures = tenant_aware_keys
|
||||||
|
.iter()
|
||||||
|
.map(|redis_key| self.pool.get::<Option<V>, _>(redis_key));
|
||||||
|
|
||||||
|
let results = futures::future::try_join_all(futures)
|
||||||
|
.await
|
||||||
|
.change_context(errors::RedisError::GetFailed)
|
||||||
|
.attach_printable("Failed to get keys in cluster mode")?;
|
||||||
|
|
||||||
|
Ok(results)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Helper method to encapsulate the logic for choosing between cluster and non-cluster modes
|
||||||
|
#[instrument(level = "DEBUG", skip(self))]
|
||||||
|
async fn get_keys_by_mode<V>(
|
||||||
|
&self,
|
||||||
|
keys: &[RedisKey],
|
||||||
|
) -> CustomResult<Vec<Option<V>>, errors::RedisError>
|
||||||
|
where
|
||||||
|
V: FromRedis + Unpin + Send + 'static,
|
||||||
|
{
|
||||||
|
if self.config.cluster_enabled {
|
||||||
|
// Use individual GET commands for cluster mode to avoid CROSSSLOT errors
|
||||||
|
self.get_multiple_keys_with_parallel_get(keys).await
|
||||||
|
} else {
|
||||||
|
// Use MGET for non-cluster mode for better performance
|
||||||
|
self.get_multiple_keys_with_mget(keys).await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[instrument(level = "DEBUG", skip(self))]
|
||||||
|
pub async fn get_multiple_keys<V>(
|
||||||
|
&self,
|
||||||
|
keys: &[RedisKey],
|
||||||
|
) -> CustomResult<Vec<Option<V>>, errors::RedisError>
|
||||||
|
where
|
||||||
|
V: FromRedis + Unpin + Send + 'static,
|
||||||
|
{
|
||||||
|
if keys.is_empty() {
|
||||||
|
return Ok(Vec::new());
|
||||||
|
}
|
||||||
|
|
||||||
|
match self.get_keys_by_mode(keys).await {
|
||||||
Ok(values) => Ok(values),
|
Ok(values) => Ok(values),
|
||||||
Err(_err) => {
|
Err(_err) => {
|
||||||
#[cfg(not(feature = "multitenancy_fallback"))]
|
#[cfg(not(feature = "multitenancy_fallback"))]
|
||||||
@ -228,15 +284,12 @@ impl super::RedisConnectionPool {
|
|||||||
|
|
||||||
#[cfg(feature = "multitenancy_fallback")]
|
#[cfg(feature = "multitenancy_fallback")]
|
||||||
{
|
{
|
||||||
let tenant_unaware_keys: Vec<String> = keys
|
let tenant_unaware_keys: Vec<RedisKey> = keys
|
||||||
.iter()
|
.iter()
|
||||||
.map(|key| key.tenant_unaware_key(self))
|
.map(|key| key.tenant_unaware_key(self).into())
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
self.pool
|
self.get_keys_by_mode(&tenant_unaware_keys).await
|
||||||
.mget(tenant_unaware_keys)
|
|
||||||
.await
|
|
||||||
.change_context(errors::RedisError::GetFailed)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -237,6 +237,7 @@ pub struct RedisConfig {
|
|||||||
default_ttl: u32,
|
default_ttl: u32,
|
||||||
default_stream_read_count: u64,
|
default_stream_read_count: u64,
|
||||||
default_hash_ttl: u32,
|
default_hash_ttl: u32,
|
||||||
|
cluster_enabled: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl From<&RedisSettings> for RedisConfig {
|
impl From<&RedisSettings> for RedisConfig {
|
||||||
@ -245,6 +246,7 @@ impl From<&RedisSettings> for RedisConfig {
|
|||||||
default_ttl: config.default_ttl,
|
default_ttl: config.default_ttl,
|
||||||
default_stream_read_count: config.stream_read_count,
|
default_stream_read_count: config.stream_read_count,
|
||||||
default_hash_ttl: config.default_hash_ttl,
|
default_hash_ttl: config.default_hash_ttl,
|
||||||
|
cluster_enabled: config.cluster_enabled,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user