feat(core): add support for api locking with multiple keys for a single api (#8887)

This commit is contained in:
Hrithikesh
2025-08-13 13:13:15 +05:30
committed by GitHub
parent 5a09d7ec2a
commit 2f7cd4f752
5 changed files with 663 additions and 18 deletions

View File

@ -25,7 +25,10 @@ use tracing::instrument;
use crate::{
errors,
types::{DelReply, HsetnxReply, MsetnxReply, RedisEntryId, RedisKey, SaddReply, SetnxReply},
types::{
DelReply, HsetnxReply, MsetnxReply, RedisEntryId, RedisKey, SaddReply, SetGetReply,
SetnxReply,
},
};
impl super::RedisConnectionPool {
@ -195,6 +198,50 @@ impl super::RedisConnectionPool {
}
}
#[instrument(level = "DEBUG", skip(self))]
pub async fn get_multiple_keys<V>(
&self,
keys: &[RedisKey],
) -> CustomResult<Vec<Option<V>>, errors::RedisError>
where
V: FromRedis + Unpin + Send + 'static,
{
if keys.is_empty() {
return Ok(Vec::new());
}
let tenant_aware_keys: Vec<String> =
keys.iter().map(|key| key.tenant_aware_key(self)).collect();
match self
.pool
.mget(tenant_aware_keys.clone())
.await
.change_context(errors::RedisError::GetFailed)
{
Ok(values) => Ok(values),
Err(_err) => {
#[cfg(not(feature = "multitenancy_fallback"))]
{
Err(_err)
}
#[cfg(feature = "multitenancy_fallback")]
{
let tenant_unaware_keys: Vec<String> = keys
.iter()
.map(|key| key.tenant_unaware_key(self))
.collect();
self.pool
.mget(tenant_unaware_keys)
.await
.change_context(errors::RedisError::GetFailed)
}
}
}
}
#[instrument(level = "DEBUG", skip(self))]
pub async fn exists<V>(&self, key: &RedisKey) -> CustomResult<bool, errors::RedisError>
where
@ -242,6 +289,37 @@ impl super::RedisConnectionPool {
.change_context(errors::RedisError::JsonDeserializationFailed)
}
#[instrument(level = "DEBUG", skip(self))]
pub async fn get_and_deserialize_multiple_keys<T>(
&self,
keys: &[RedisKey],
type_name: &'static str,
) -> CustomResult<Vec<Option<T>>, errors::RedisError>
where
T: serde::de::DeserializeOwned,
{
let value_bytes_vec = self.get_multiple_keys::<Vec<u8>>(keys).await?;
let mut results = Vec::with_capacity(value_bytes_vec.len());
for value_bytes_opt in value_bytes_vec {
match value_bytes_opt {
Some(value_bytes) => {
if value_bytes.is_empty() {
results.push(None);
} else {
let parsed = value_bytes
.parse_struct(type_name)
.change_context(errors::RedisError::JsonDeserializationFailed)?;
results.push(Some(parsed));
}
}
None => results.push(None),
}
}
Ok(results)
}
#[instrument(level = "DEBUG", skip(self))]
pub async fn delete_key(&self, key: &RedisKey) -> CustomResult<DelReply, errors::RedisError> {
match self
@ -966,6 +1044,102 @@ impl super::RedisConnectionPool {
.change_context(errors::RedisError::IncrementHashFieldFailed)?;
Ok(val)
}
#[instrument(level = "DEBUG", skip(self))]
pub async fn set_multiple_keys_if_not_exists_and_get_values<V>(
&self,
keys: &[(RedisKey, V)],
ttl: Option<i64>,
) -> CustomResult<Vec<SetGetReply<V>>, errors::RedisError>
where
V: TryInto<RedisValue>
+ Debug
+ FromRedis
+ ToOwned<Owned = V>
+ Send
+ Sync
+ serde::de::DeserializeOwned,
V::Error: Into<fred::error::RedisError> + Send + Sync,
{
let futures = keys.iter().map(|(key, value)| {
self.set_key_if_not_exists_and_get_value(key, (*value).to_owned(), ttl)
});
let del_result = futures::future::try_join_all(futures)
.await
.change_context(errors::RedisError::SetFailed)?;
Ok(del_result)
}
/// Sets a value in Redis if not already present, and returns the value (either existing or newly set).
/// This operation is atomic using Redis transactions.
#[instrument(level = "DEBUG", skip(self))]
pub async fn set_key_if_not_exists_and_get_value<V>(
&self,
key: &RedisKey,
value: V,
ttl: Option<i64>,
) -> CustomResult<SetGetReply<V>, errors::RedisError>
where
V: TryInto<RedisValue> + Debug + FromRedis + Send + Sync + serde::de::DeserializeOwned,
V::Error: Into<fred::error::RedisError> + Send + Sync,
{
let redis_key = key.tenant_aware_key(self);
let ttl_seconds = ttl.unwrap_or(self.config.default_ttl.into());
// Get a client from the pool and start transaction
let trx = self.get_transaction();
// Try to set if not exists with expiry - queue the command
trx.set::<(), _, _>(
&redis_key,
value,
Some(Expiration::EX(ttl_seconds)),
Some(SetOptions::NX),
false,
)
.await
.change_context(errors::RedisError::SetFailed)
.attach_printable("Failed to queue set command")?;
// Always get the value after the SET attempt - queue the command
trx.get::<V, _>(&redis_key)
.await
.change_context(errors::RedisError::GetFailed)
.attach_printable("Failed to queue get command")?;
// Execute transaction
let mut results: Vec<RedisValue> = trx
.exec(true)
.await
.change_context(errors::RedisError::SetFailed)
.attach_printable("Failed to execute the redis transaction")?;
let msg = "Got unexpected number of results from transaction";
let get_result = results
.pop()
.ok_or(errors::RedisError::SetFailed)
.attach_printable(msg)?;
let set_result = results
.pop()
.ok_or(errors::RedisError::SetFailed)
.attach_printable(msg)?;
// Parse the GET result to get the actual value
let actual_value: V = FromRedis::from_value(get_result)
.change_context(errors::RedisError::SetFailed)
.attach_printable("Failed to convert from redis value")?;
// Check if SET NX succeeded or failed
match set_result {
// SET NX returns "OK" if key was set
RedisValue::String(_) => Ok(SetGetReply::ValueSet(actual_value)),
// SET NX returns null if key already exists
RedisValue::Null => Ok(SetGetReply::ValueExists(actual_value)),
_ => Err(report!(errors::RedisError::SetFailed))
.attach_printable("Unexpected result from SET NX operation"),
}
}
}
#[cfg(test)]
@ -1099,6 +1273,13 @@ mod tests {
let pool = RedisConnectionPool::new(&RedisSettings::default())
.await
.expect("failed to create redis connection pool");
// First set some keys
for i in 0..3 {
let key = format!("script_test_key{i}").into();
let _ = pool.set_key(&key, format!("value{i}")).await;
}
let lua_script = r#"
local results = {}
for i = 1, #KEYS do
@ -1106,16 +1287,16 @@ mod tests {
end
return results
"#;
let mut keys_and_values = HashMap::new();
for i in 0..10 {
keys_and_values.insert(format!("key{i}"), i);
}
let key = keys_and_values.keys().cloned().collect::<Vec<_>>();
let keys = vec![
"script_test_key0".to_string(),
"script_test_key1".to_string(),
"script_test_key2".to_string(),
];
// Act
let result = pool
.evaluate_redis_script::<_, String>(lua_script, key, 0)
.evaluate_redis_script::<_, Vec<String>>(lua_script, keys, vec![""])
.await;
// Assert Setup
@ -1127,4 +1308,326 @@ mod tests {
assert!(is_success);
}
#[tokio::test]
async fn test_set_key_if_not_exists_and_get_value_new_key() {
let is_success = tokio::task::spawn_blocking(move || {
futures::executor::block_on(async {
// Arrange
let pool = RedisConnectionPool::new(&RedisSettings::default())
.await
.expect("failed to create redis connection pool");
let key = "test_new_key_string".into();
let value = "test_value".to_string();
// Act
let result = pool
.set_key_if_not_exists_and_get_value(&key, value.clone(), Some(30))
.await;
// Assert
match result {
Ok(crate::types::SetGetReply::ValueSet(returned_value)) => {
returned_value == value
}
_ => false,
}
})
})
.await
.expect("Spawn block failure");
assert!(is_success);
}
#[tokio::test]
async fn test_set_key_if_not_exists_and_get_value_existing_key() {
let is_success = tokio::task::spawn_blocking(move || {
futures::executor::block_on(async {
// Arrange
let pool = RedisConnectionPool::new(&RedisSettings::default())
.await
.expect("failed to create redis connection pool");
let key = "test_existing_key_string".into();
let initial_value = "initial_value".to_string();
let new_value = "new_value".to_string();
// First, set an initial value using regular set_key
let _ = pool.set_key(&key, initial_value.clone()).await;
// Act - try to set a new value (should fail and return existing value)
let result = pool
.set_key_if_not_exists_and_get_value(&key, new_value, Some(30))
.await;
// Assert
match result {
Ok(crate::types::SetGetReply::ValueExists(returned_value)) => {
returned_value == initial_value
}
_ => false,
}
})
})
.await
.expect("Spawn block failure");
assert!(is_success);
}
#[tokio::test]
async fn test_set_key_if_not_exists_and_get_value_with_default_ttl() {
let is_success = tokio::task::spawn_blocking(move || {
futures::executor::block_on(async {
// Arrange
let pool = RedisConnectionPool::new(&RedisSettings::default())
.await
.expect("failed to create redis connection pool");
let key = "test_default_ttl_key_string".into();
let value = "test_value".to_string();
// Act - use None for TTL to test default behavior
let result = pool
.set_key_if_not_exists_and_get_value(&key, value.clone(), None)
.await;
// Assert
match result {
Ok(crate::types::SetGetReply::ValueSet(returned_value)) => {
returned_value == value
}
_ => false,
}
})
})
.await
.expect("Spawn block failure");
assert!(is_success);
}
#[tokio::test]
async fn test_set_key_if_not_exists_and_get_value_concurrent_access() {
let is_success = tokio::task::spawn_blocking(move || {
futures::executor::block_on(async {
// Arrange
let pool = RedisConnectionPool::new(&RedisSettings::default())
.await
.expect("failed to create redis connection pool");
let key_name = "test_concurrent_key_string";
let value1 = "value1".to_string();
let value2 = "value2".to_string();
// Act - simulate concurrent access
let pool1 = pool.clone("");
let pool2 = pool.clone("");
let key1 = key_name.into();
let key2 = key_name.into();
let (result1, result2) = tokio::join!(
pool1.set_key_if_not_exists_and_get_value(&key1, value1, Some(30)),
pool2.set_key_if_not_exists_and_get_value(&key2, value2, Some(30))
);
// Assert - one should succeed with ValueSet, one should fail with ValueExists
let result1_is_set = matches!(result1, Ok(crate::types::SetGetReply::ValueSet(_)));
let result2_is_set = matches!(result2, Ok(crate::types::SetGetReply::ValueSet(_)));
let result1_is_exists =
matches!(result1, Ok(crate::types::SetGetReply::ValueExists(_)));
let result2_is_exists =
matches!(result2, Ok(crate::types::SetGetReply::ValueExists(_)));
// Exactly one should be ValueSet and one should be ValueExists
(result1_is_set && result2_is_exists) || (result1_is_exists && result2_is_set)
})
})
.await
.expect("Spawn block failure");
assert!(is_success);
}
#[tokio::test]
async fn test_get_multiple_keys_success() {
let is_success = tokio::task::spawn_blocking(move || {
futures::executor::block_on(async {
// Arrange
let pool = RedisConnectionPool::new(&RedisSettings::default())
.await
.expect("failed to create redis connection pool");
// Set up test data
let keys = vec![
"multi_test_key1".into(),
"multi_test_key2".into(),
"multi_test_key3".into(),
];
let values = ["value1", "value2", "value3"];
// Set the keys
for (key, value) in keys.iter().zip(values.iter()) {
let _ = pool.set_key(key, value.to_string()).await;
}
// Act
let result = pool.get_multiple_keys::<String>(&keys).await;
// Assert
match result {
Ok(retrieved_values) => {
retrieved_values.len() == 3
&& retrieved_values.first() == Some(&Some("value1".to_string()))
&& retrieved_values.get(1) == Some(&Some("value2".to_string()))
&& retrieved_values.get(2) == Some(&Some("value3".to_string()))
}
_ => false,
}
})
})
.await
.expect("Spawn block failure");
assert!(is_success);
}
#[tokio::test]
async fn test_get_multiple_keys_with_missing_keys() {
let is_success = tokio::task::spawn_blocking(move || {
futures::executor::block_on(async {
// Arrange
let pool = RedisConnectionPool::new(&RedisSettings::default())
.await
.expect("failed to create redis connection pool");
let keys = vec![
"existing_key".into(),
"non_existing_key".into(),
"another_existing_key".into(),
];
// Set only some keys
let _ = pool
.set_key(
keys.first().expect("should not be none"),
"value1".to_string(),
)
.await;
let _ = pool
.set_key(
keys.get(2).expect("should not be none"),
"value3".to_string(),
)
.await;
// Act
let result = pool.get_multiple_keys::<String>(&keys).await;
// Assert
match result {
Ok(retrieved_values) => {
retrieved_values.len() == 3
&& *retrieved_values.first().expect("should not be none")
== Some("value1".to_string())
&& retrieved_values.get(1).is_some_and(|v| v.is_none())
&& *retrieved_values.get(2).expect("should not be none")
== Some("value3".to_string())
}
_ => false,
}
})
})
.await
.expect("Spawn block failure");
assert!(is_success);
}
#[tokio::test]
async fn test_get_multiple_keys_empty_input() {
let is_success = tokio::task::spawn_blocking(move || {
futures::executor::block_on(async {
// Arrange
let pool = RedisConnectionPool::new(&RedisSettings::default())
.await
.expect("failed to create redis connection pool");
let keys: Vec<crate::types::RedisKey> = vec![];
// Act
let result = pool.get_multiple_keys::<String>(&keys).await;
// Assert
match result {
Ok(retrieved_values) => retrieved_values.is_empty(),
_ => false,
}
})
})
.await
.expect("Spawn block failure");
assert!(is_success);
}
#[tokio::test]
async fn test_get_and_deserialize_multiple_keys() {
let is_success = tokio::task::spawn_blocking(move || {
futures::executor::block_on(async {
// Arrange
let pool = RedisConnectionPool::new(&RedisSettings::default())
.await
.expect("failed to create redis connection pool");
#[derive(serde::Serialize, serde::Deserialize, PartialEq, Debug, Clone)]
struct TestData {
id: u32,
name: String,
}
let test_data = [
TestData {
id: 1,
name: "test1".to_string(),
},
TestData {
id: 2,
name: "test2".to_string(),
},
];
let keys = vec![
"serialize_test_key1".into(),
"serialize_test_key2".into(),
"non_existing_serialize_key".into(),
];
// Set serialized data for first two keys
for (i, data) in test_data.iter().enumerate() {
let _ = pool
.serialize_and_set_key(keys.get(i).expect("should not be none"), data)
.await;
}
// Act
let result = pool
.get_and_deserialize_multiple_keys::<TestData>(&keys, "TestData")
.await;
// Assert
match result {
Ok(retrieved_data) => {
retrieved_data.len() == 3
&& retrieved_data.first() == Some(&Some(test_data[0].clone()))
&& retrieved_data.get(1) == Some(&Some(test_data[1].clone()))
&& retrieved_data.get(2) == Some(&None)
}
_ => false,
}
})
})
.await
.expect("Spawn block failure");
assert!(is_success);
}
}

View File

@ -24,7 +24,11 @@ use std::sync::{atomic, Arc};
use common_utils::errors::CustomResult;
use error_stack::ResultExt;
pub use fred::interfaces::PubsubInterface;
use fred::{interfaces::ClientLike, prelude::EventInterface};
use fred::{
clients::Transaction,
interfaces::ClientLike,
prelude::{EventInterface, TransactionInterface},
};
pub use self::types::*;
@ -223,6 +227,10 @@ impl RedisConnectionPool {
})
});
}
pub fn get_transaction(&self) -> Transaction {
self.pool.next().multi()
}
}
pub struct RedisConfig {

View File

@ -262,6 +262,16 @@ pub enum DelReply {
KeyNotDeleted, // Key not found
}
impl DelReply {
pub fn is_key_deleted(&self) -> bool {
matches!(self, Self::KeyDeleted)
}
pub fn is_key_not_deleted(&self) -> bool {
matches!(self, Self::KeyNotDeleted)
}
}
impl fred::types::FromRedis for DelReply {
fn from_value(value: fred::types::RedisValue) -> Result<Self, fred::error::RedisError> {
match value {
@ -294,6 +304,21 @@ impl fred::types::FromRedis for SaddReply {
}
}
#[derive(Debug)]
pub enum SetGetReply<T> {
ValueSet(T), // Value was set and this is the value that was set
ValueExists(T), // Value already existed and this is the existing value
}
impl<T> SetGetReply<T> {
pub fn get_value(&self) -> &T {
match self {
Self::ValueSet(value) => value,
Self::ValueExists(value) => value,
}
}
}
#[derive(Debug)]
pub struct RedisKey(String);