From 2f7cd4f752d98e23910c37502d8748e9842682ef Mon Sep 17 00:00:00 2001 From: Hrithikesh <61539176+hrithikesh026@users.noreply.github.com> Date: Wed, 13 Aug 2025 13:13:15 +0530 Subject: [PATCH] feat(core): add support for api locking with multiple keys for a single api (#8887) --- crates/redis_interface/src/commands.rs | 517 ++++++++++++++++++++++++- crates/redis_interface/src/lib.rs | 10 +- crates/redis_interface/src/types.rs | 25 ++ crates/router/src/core/api_locking.rs | 98 ++++- crates/router/src/routes/payments.rs | 31 +- 5 files changed, 663 insertions(+), 18 deletions(-) diff --git a/crates/redis_interface/src/commands.rs b/crates/redis_interface/src/commands.rs index 9cb413bc78..68d27ba5a7 100644 --- a/crates/redis_interface/src/commands.rs +++ b/crates/redis_interface/src/commands.rs @@ -25,7 +25,10 @@ use tracing::instrument; use crate::{ errors, - types::{DelReply, HsetnxReply, MsetnxReply, RedisEntryId, RedisKey, SaddReply, SetnxReply}, + types::{ + DelReply, HsetnxReply, MsetnxReply, RedisEntryId, RedisKey, SaddReply, SetGetReply, + SetnxReply, + }, }; impl super::RedisConnectionPool { @@ -195,6 +198,50 @@ impl super::RedisConnectionPool { } } + #[instrument(level = "DEBUG", skip(self))] + pub async fn get_multiple_keys( + &self, + keys: &[RedisKey], + ) -> CustomResult>, errors::RedisError> + where + V: FromRedis + Unpin + Send + 'static, + { + if keys.is_empty() { + return Ok(Vec::new()); + } + + let tenant_aware_keys: Vec = + keys.iter().map(|key| key.tenant_aware_key(self)).collect(); + + match self + .pool + .mget(tenant_aware_keys.clone()) + .await + .change_context(errors::RedisError::GetFailed) + { + Ok(values) => Ok(values), + Err(_err) => { + #[cfg(not(feature = "multitenancy_fallback"))] + { + Err(_err) + } + + #[cfg(feature = "multitenancy_fallback")] + { + let tenant_unaware_keys: Vec = keys + .iter() + .map(|key| key.tenant_unaware_key(self)) + .collect(); + + self.pool + .mget(tenant_unaware_keys) + .await + .change_context(errors::RedisError::GetFailed) + } + } + } + } + #[instrument(level = "DEBUG", skip(self))] pub async fn exists(&self, key: &RedisKey) -> CustomResult where @@ -242,6 +289,37 @@ impl super::RedisConnectionPool { .change_context(errors::RedisError::JsonDeserializationFailed) } + #[instrument(level = "DEBUG", skip(self))] + pub async fn get_and_deserialize_multiple_keys( + &self, + keys: &[RedisKey], + type_name: &'static str, + ) -> CustomResult>, errors::RedisError> + where + T: serde::de::DeserializeOwned, + { + let value_bytes_vec = self.get_multiple_keys::>(keys).await?; + + let mut results = Vec::with_capacity(value_bytes_vec.len()); + for value_bytes_opt in value_bytes_vec { + match value_bytes_opt { + Some(value_bytes) => { + if value_bytes.is_empty() { + results.push(None); + } else { + let parsed = value_bytes + .parse_struct(type_name) + .change_context(errors::RedisError::JsonDeserializationFailed)?; + results.push(Some(parsed)); + } + } + None => results.push(None), + } + } + + Ok(results) + } + #[instrument(level = "DEBUG", skip(self))] pub async fn delete_key(&self, key: &RedisKey) -> CustomResult { match self @@ -966,6 +1044,102 @@ impl super::RedisConnectionPool { .change_context(errors::RedisError::IncrementHashFieldFailed)?; Ok(val) } + + #[instrument(level = "DEBUG", skip(self))] + pub async fn set_multiple_keys_if_not_exists_and_get_values( + &self, + keys: &[(RedisKey, V)], + ttl: Option, + ) -> CustomResult>, errors::RedisError> + where + V: TryInto + + Debug + + FromRedis + + ToOwned + + Send + + Sync + + serde::de::DeserializeOwned, + V::Error: Into + Send + Sync, + { + let futures = keys.iter().map(|(key, value)| { + self.set_key_if_not_exists_and_get_value(key, (*value).to_owned(), ttl) + }); + + let del_result = futures::future::try_join_all(futures) + .await + .change_context(errors::RedisError::SetFailed)?; + + Ok(del_result) + } + + /// Sets a value in Redis if not already present, and returns the value (either existing or newly set). + /// This operation is atomic using Redis transactions. + #[instrument(level = "DEBUG", skip(self))] + pub async fn set_key_if_not_exists_and_get_value( + &self, + key: &RedisKey, + value: V, + ttl: Option, + ) -> CustomResult, errors::RedisError> + where + V: TryInto + Debug + FromRedis + Send + Sync + serde::de::DeserializeOwned, + V::Error: Into + Send + Sync, + { + let redis_key = key.tenant_aware_key(self); + let ttl_seconds = ttl.unwrap_or(self.config.default_ttl.into()); + + // Get a client from the pool and start transaction + let trx = self.get_transaction(); + + // Try to set if not exists with expiry - queue the command + trx.set::<(), _, _>( + &redis_key, + value, + Some(Expiration::EX(ttl_seconds)), + Some(SetOptions::NX), + false, + ) + .await + .change_context(errors::RedisError::SetFailed) + .attach_printable("Failed to queue set command")?; + + // Always get the value after the SET attempt - queue the command + trx.get::(&redis_key) + .await + .change_context(errors::RedisError::GetFailed) + .attach_printable("Failed to queue get command")?; + + // Execute transaction + let mut results: Vec = trx + .exec(true) + .await + .change_context(errors::RedisError::SetFailed) + .attach_printable("Failed to execute the redis transaction")?; + + let msg = "Got unexpected number of results from transaction"; + let get_result = results + .pop() + .ok_or(errors::RedisError::SetFailed) + .attach_printable(msg)?; + let set_result = results + .pop() + .ok_or(errors::RedisError::SetFailed) + .attach_printable(msg)?; + // Parse the GET result to get the actual value + let actual_value: V = FromRedis::from_value(get_result) + .change_context(errors::RedisError::SetFailed) + .attach_printable("Failed to convert from redis value")?; + + // Check if SET NX succeeded or failed + match set_result { + // SET NX returns "OK" if key was set + RedisValue::String(_) => Ok(SetGetReply::ValueSet(actual_value)), + // SET NX returns null if key already exists + RedisValue::Null => Ok(SetGetReply::ValueExists(actual_value)), + _ => Err(report!(errors::RedisError::SetFailed)) + .attach_printable("Unexpected result from SET NX operation"), + } + } } #[cfg(test)] @@ -1099,6 +1273,13 @@ mod tests { let pool = RedisConnectionPool::new(&RedisSettings::default()) .await .expect("failed to create redis connection pool"); + + // First set some keys + for i in 0..3 { + let key = format!("script_test_key{i}").into(); + let _ = pool.set_key(&key, format!("value{i}")).await; + } + let lua_script = r#" local results = {} for i = 1, #KEYS do @@ -1106,16 +1287,16 @@ mod tests { end return results "#; - let mut keys_and_values = HashMap::new(); - for i in 0..10 { - keys_and_values.insert(format!("key{i}"), i); - } - let key = keys_and_values.keys().cloned().collect::>(); + let keys = vec![ + "script_test_key0".to_string(), + "script_test_key1".to_string(), + "script_test_key2".to_string(), + ]; // Act let result = pool - .evaluate_redis_script::<_, String>(lua_script, key, 0) + .evaluate_redis_script::<_, Vec>(lua_script, keys, vec![""]) .await; // Assert Setup @@ -1127,4 +1308,326 @@ mod tests { assert!(is_success); } + + #[tokio::test] + async fn test_set_key_if_not_exists_and_get_value_new_key() { + let is_success = tokio::task::spawn_blocking(move || { + futures::executor::block_on(async { + // Arrange + let pool = RedisConnectionPool::new(&RedisSettings::default()) + .await + .expect("failed to create redis connection pool"); + let key = "test_new_key_string".into(); + let value = "test_value".to_string(); + + // Act + let result = pool + .set_key_if_not_exists_and_get_value(&key, value.clone(), Some(30)) + .await; + + // Assert + match result { + Ok(crate::types::SetGetReply::ValueSet(returned_value)) => { + returned_value == value + } + _ => false, + } + }) + }) + .await + .expect("Spawn block failure"); + + assert!(is_success); + } + + #[tokio::test] + async fn test_set_key_if_not_exists_and_get_value_existing_key() { + let is_success = tokio::task::spawn_blocking(move || { + futures::executor::block_on(async { + // Arrange + let pool = RedisConnectionPool::new(&RedisSettings::default()) + .await + .expect("failed to create redis connection pool"); + let key = "test_existing_key_string".into(); + let initial_value = "initial_value".to_string(); + let new_value = "new_value".to_string(); + + // First, set an initial value using regular set_key + let _ = pool.set_key(&key, initial_value.clone()).await; + + // Act - try to set a new value (should fail and return existing value) + let result = pool + .set_key_if_not_exists_and_get_value(&key, new_value, Some(30)) + .await; + + // Assert + match result { + Ok(crate::types::SetGetReply::ValueExists(returned_value)) => { + returned_value == initial_value + } + _ => false, + } + }) + }) + .await + .expect("Spawn block failure"); + + assert!(is_success); + } + + #[tokio::test] + async fn test_set_key_if_not_exists_and_get_value_with_default_ttl() { + let is_success = tokio::task::spawn_blocking(move || { + futures::executor::block_on(async { + // Arrange + let pool = RedisConnectionPool::new(&RedisSettings::default()) + .await + .expect("failed to create redis connection pool"); + let key = "test_default_ttl_key_string".into(); + let value = "test_value".to_string(); + + // Act - use None for TTL to test default behavior + let result = pool + .set_key_if_not_exists_and_get_value(&key, value.clone(), None) + .await; + + // Assert + match result { + Ok(crate::types::SetGetReply::ValueSet(returned_value)) => { + returned_value == value + } + _ => false, + } + }) + }) + .await + .expect("Spawn block failure"); + + assert!(is_success); + } + + #[tokio::test] + async fn test_set_key_if_not_exists_and_get_value_concurrent_access() { + let is_success = tokio::task::spawn_blocking(move || { + futures::executor::block_on(async { + // Arrange + let pool = RedisConnectionPool::new(&RedisSettings::default()) + .await + .expect("failed to create redis connection pool"); + let key_name = "test_concurrent_key_string"; + let value1 = "value1".to_string(); + let value2 = "value2".to_string(); + + // Act - simulate concurrent access + let pool1 = pool.clone(""); + let pool2 = pool.clone(""); + let key1 = key_name.into(); + let key2 = key_name.into(); + + let (result1, result2) = tokio::join!( + pool1.set_key_if_not_exists_and_get_value(&key1, value1, Some(30)), + pool2.set_key_if_not_exists_and_get_value(&key2, value2, Some(30)) + ); + + // Assert - one should succeed with ValueSet, one should fail with ValueExists + let result1_is_set = matches!(result1, Ok(crate::types::SetGetReply::ValueSet(_))); + let result2_is_set = matches!(result2, Ok(crate::types::SetGetReply::ValueSet(_))); + let result1_is_exists = + matches!(result1, Ok(crate::types::SetGetReply::ValueExists(_))); + let result2_is_exists = + matches!(result2, Ok(crate::types::SetGetReply::ValueExists(_))); + + // Exactly one should be ValueSet and one should be ValueExists + (result1_is_set && result2_is_exists) || (result1_is_exists && result2_is_set) + }) + }) + .await + .expect("Spawn block failure"); + + assert!(is_success); + } + + #[tokio::test] + async fn test_get_multiple_keys_success() { + let is_success = tokio::task::spawn_blocking(move || { + futures::executor::block_on(async { + // Arrange + let pool = RedisConnectionPool::new(&RedisSettings::default()) + .await + .expect("failed to create redis connection pool"); + + // Set up test data + let keys = vec![ + "multi_test_key1".into(), + "multi_test_key2".into(), + "multi_test_key3".into(), + ]; + let values = ["value1", "value2", "value3"]; + + // Set the keys + for (key, value) in keys.iter().zip(values.iter()) { + let _ = pool.set_key(key, value.to_string()).await; + } + + // Act + let result = pool.get_multiple_keys::(&keys).await; + + // Assert + match result { + Ok(retrieved_values) => { + retrieved_values.len() == 3 + && retrieved_values.first() == Some(&Some("value1".to_string())) + && retrieved_values.get(1) == Some(&Some("value2".to_string())) + && retrieved_values.get(2) == Some(&Some("value3".to_string())) + } + _ => false, + } + }) + }) + .await + .expect("Spawn block failure"); + + assert!(is_success); + } + + #[tokio::test] + async fn test_get_multiple_keys_with_missing_keys() { + let is_success = tokio::task::spawn_blocking(move || { + futures::executor::block_on(async { + // Arrange + let pool = RedisConnectionPool::new(&RedisSettings::default()) + .await + .expect("failed to create redis connection pool"); + + let keys = vec![ + "existing_key".into(), + "non_existing_key".into(), + "another_existing_key".into(), + ]; + + // Set only some keys + let _ = pool + .set_key( + keys.first().expect("should not be none"), + "value1".to_string(), + ) + .await; + let _ = pool + .set_key( + keys.get(2).expect("should not be none"), + "value3".to_string(), + ) + .await; + + // Act + let result = pool.get_multiple_keys::(&keys).await; + + // Assert + match result { + Ok(retrieved_values) => { + retrieved_values.len() == 3 + && *retrieved_values.first().expect("should not be none") + == Some("value1".to_string()) + && retrieved_values.get(1).is_some_and(|v| v.is_none()) + && *retrieved_values.get(2).expect("should not be none") + == Some("value3".to_string()) + } + _ => false, + } + }) + }) + .await + .expect("Spawn block failure"); + + assert!(is_success); + } + + #[tokio::test] + async fn test_get_multiple_keys_empty_input() { + let is_success = tokio::task::spawn_blocking(move || { + futures::executor::block_on(async { + // Arrange + let pool = RedisConnectionPool::new(&RedisSettings::default()) + .await + .expect("failed to create redis connection pool"); + + let keys: Vec = vec![]; + + // Act + let result = pool.get_multiple_keys::(&keys).await; + + // Assert + match result { + Ok(retrieved_values) => retrieved_values.is_empty(), + _ => false, + } + }) + }) + .await + .expect("Spawn block failure"); + + assert!(is_success); + } + + #[tokio::test] + async fn test_get_and_deserialize_multiple_keys() { + let is_success = tokio::task::spawn_blocking(move || { + futures::executor::block_on(async { + // Arrange + let pool = RedisConnectionPool::new(&RedisSettings::default()) + .await + .expect("failed to create redis connection pool"); + + #[derive(serde::Serialize, serde::Deserialize, PartialEq, Debug, Clone)] + struct TestData { + id: u32, + name: String, + } + + let test_data = [ + TestData { + id: 1, + name: "test1".to_string(), + }, + TestData { + id: 2, + name: "test2".to_string(), + }, + ]; + + let keys = vec![ + "serialize_test_key1".into(), + "serialize_test_key2".into(), + "non_existing_serialize_key".into(), + ]; + + // Set serialized data for first two keys + for (i, data) in test_data.iter().enumerate() { + let _ = pool + .serialize_and_set_key(keys.get(i).expect("should not be none"), data) + .await; + } + + // Act + let result = pool + .get_and_deserialize_multiple_keys::(&keys, "TestData") + .await; + + // Assert + match result { + Ok(retrieved_data) => { + retrieved_data.len() == 3 + && retrieved_data.first() == Some(&Some(test_data[0].clone())) + && retrieved_data.get(1) == Some(&Some(test_data[1].clone())) + && retrieved_data.get(2) == Some(&None) + } + _ => false, + } + }) + }) + .await + .expect("Spawn block failure"); + + assert!(is_success); + } } diff --git a/crates/redis_interface/src/lib.rs b/crates/redis_interface/src/lib.rs index 3ef6b16bdb..0951863757 100644 --- a/crates/redis_interface/src/lib.rs +++ b/crates/redis_interface/src/lib.rs @@ -24,7 +24,11 @@ use std::sync::{atomic, Arc}; use common_utils::errors::CustomResult; use error_stack::ResultExt; pub use fred::interfaces::PubsubInterface; -use fred::{interfaces::ClientLike, prelude::EventInterface}; +use fred::{ + clients::Transaction, + interfaces::ClientLike, + prelude::{EventInterface, TransactionInterface}, +}; pub use self::types::*; @@ -223,6 +227,10 @@ impl RedisConnectionPool { }) }); } + + pub fn get_transaction(&self) -> Transaction { + self.pool.next().multi() + } } pub struct RedisConfig { diff --git a/crates/redis_interface/src/types.rs b/crates/redis_interface/src/types.rs index 848996deb4..4f966dc90a 100644 --- a/crates/redis_interface/src/types.rs +++ b/crates/redis_interface/src/types.rs @@ -262,6 +262,16 @@ pub enum DelReply { KeyNotDeleted, // Key not found } +impl DelReply { + pub fn is_key_deleted(&self) -> bool { + matches!(self, Self::KeyDeleted) + } + + pub fn is_key_not_deleted(&self) -> bool { + matches!(self, Self::KeyNotDeleted) + } +} + impl fred::types::FromRedis for DelReply { fn from_value(value: fred::types::RedisValue) -> Result { match value { @@ -294,6 +304,21 @@ impl fred::types::FromRedis for SaddReply { } } +#[derive(Debug)] +pub enum SetGetReply { + ValueSet(T), // Value was set and this is the value that was set + ValueExists(T), // Value already existed and this is the existing value +} + +impl SetGetReply { + pub fn get_value(&self) -> &T { + match self { + Self::ValueSet(value) => value, + Self::ValueExists(value) => value, + } + } +} + #[derive(Debug)] pub struct RedisKey(String); diff --git a/crates/router/src/core/api_locking.rs b/crates/router/src/core/api_locking.rs index 7043a090b2..60fc9ae0a3 100644 --- a/crates/router/src/core/api_locking.rs +++ b/crates/router/src/core/api_locking.rs @@ -2,7 +2,7 @@ use std::fmt::Debug; use actix_web::rt::time as actix_time; use error_stack::{report, ResultExt}; -use redis_interface as redis; +use redis_interface::{self as redis, RedisKey}; use router_env::{instrument, logger, tracing}; use super::errors::{self, RouterResult}; @@ -22,6 +22,8 @@ pub enum LockStatus { pub enum LockAction { // Sleep until the lock is acquired Hold { input: LockingInput }, + // Sleep until all locks are acquired + HoldMultiple { inputs: Vec }, // Queue it but return response as 2xx, could be used for webhooks QueueWithOk, // Return Error @@ -38,7 +40,7 @@ pub struct LockingInput { } impl LockingInput { - fn get_redis_locking_key(&self, merchant_id: common_utils::id_type::MerchantId) -> String { + fn get_redis_locking_key(&self, merchant_id: &common_utils::id_type::MerchantId) -> String { format!( "{}_{}_{}_{}", API_LOCK_PREFIX, @@ -60,13 +62,50 @@ impl LockAction { A: SessionStateInfo, { match self { + Self::HoldMultiple { inputs } => { + let lock_retries = inputs + .iter() + .find_map(|input| input.override_lock_retries) + .unwrap_or(state.conf().lock_settings.lock_retries); + let request_id = state.get_request_id(); + let redis_lock_expiry_seconds = + state.conf().lock_settings.redis_lock_expiry_seconds; + let redis_conn = state + .store() + .get_redis_conn() + .change_context(errors::ApiErrorResponse::InternalServerError)?; + let redis_key_values = inputs + .iter() + .map(|input| input.get_redis_locking_key(&merchant_id)) + .map(|key| (RedisKey::from(key.as_str()), request_id.clone())) + .collect::>(); + for _retry in 0..lock_retries { + let results: Vec> = redis_conn + .set_multiple_keys_if_not_exists_and_get_values( + &redis_key_values, + Some(i64::from(redis_lock_expiry_seconds)), + ) + .await + .change_context(errors::ApiErrorResponse::InternalServerError)?; + let lock_aqcuired = results.iter().all(|res| { + // each redis value must match the request_id + // if even 1 does match, the lock is not acquired + *res.get_value() == request_id + }); + if lock_aqcuired { + logger::info!("Lock acquired for locking inputs {:?}", inputs); + return Ok(()); + } + } + Err(report!(errors::ApiErrorResponse::ResourceBusy)) + } Self::Hold { input } => { let redis_conn = state .store() .get_redis_conn() .change_context(errors::ApiErrorResponse::InternalServerError)?; - let redis_locking_key = input.get_redis_locking_key(merchant_id); + let redis_locking_key = input.get_redis_locking_key(&merchant_id); let delay_between_retries_in_milliseconds = state .conf() .lock_settings @@ -125,13 +164,64 @@ impl LockAction { A: SessionStateInfo, { match self { + Self::HoldMultiple { inputs } => { + let redis_conn = state + .store() + .get_redis_conn() + .change_context(errors::ApiErrorResponse::InternalServerError)?; + + let redis_locking_keys = inputs + .iter() + .map(|input| RedisKey::from(input.get_redis_locking_key(&merchant_id).as_str())) + .collect::>(); + let request_id = state.get_request_id(); + let values = redis_conn + .get_multiple_keys::(&redis_locking_keys) + .await + .change_context(errors::ApiErrorResponse::InternalServerError)?; + + let invalid_request_id_list = values + .iter() + .filter(|redis_value| **redis_value != request_id) + .flatten() + .collect::>(); + + if !invalid_request_id_list.is_empty() { + logger::error!( + "The request_id which acquired the lock is not equal to the request_id requesting for releasing the lock. + Current request_id: {:?}, + Redis request_ids : {:?}", + request_id, + invalid_request_id_list + ); + Err(errors::ApiErrorResponse::InternalServerError) + .attach_printable("The request_id which acquired the lock is not equal to the request_id requesting for releasing the lock") + } else { + Ok(()) + }?; + let delete_result = redis_conn + .delete_multiple_keys(&redis_locking_keys) + .await + .change_context(errors::ApiErrorResponse::InternalServerError)?; + let is_key_not_deleted = delete_result + .into_iter() + .any(|delete_reply| delete_reply.is_key_not_deleted()); + if is_key_not_deleted { + Err(errors::ApiErrorResponse::InternalServerError).attach_printable( + "Status release lock called but key is not found in redis", + ) + } else { + logger::info!("Lock freed for locking inputs {:?}", inputs); + Ok(()) + } + } Self::Hold { input } => { let redis_conn = state .store() .get_redis_conn() .change_context(errors::ApiErrorResponse::InternalServerError)?; - let redis_locking_key = input.get_redis_locking_key(merchant_id); + let redis_locking_key = input.get_redis_locking_key(&merchant_id); match redis_conn .get_key::>(&redis_locking_key.as_str().into()) diff --git a/crates/router/src/routes/payments.rs b/crates/router/src/routes/payments.rs index f9bee6bceb..72f184be72 100644 --- a/crates/router/src/routes/payments.rs +++ b/crates/router/src/routes/payments.rs @@ -2394,12 +2394,31 @@ impl GetLockingInput for payment_types::PaymentsRequest { { match self.payment_id { Some(payment_types::PaymentIdType::PaymentIntentId(ref id)) => { - api_locking::LockAction::Hold { - input: api_locking::LockingInput { - unique_locking_key: id.get_string_repr().to_owned(), - api_identifier: lock_utils::ApiIdentifier::from(flow), - override_lock_retries: None, - }, + let api_identifier = lock_utils::ApiIdentifier::from(flow); + let intent_id_locking_input = api_locking::LockingInput { + unique_locking_key: id.get_string_repr().to_owned(), + api_identifier: api_identifier.clone(), + override_lock_retries: None, + }; + if let Some(customer_id) = self + .customer_id + .as_ref() + .or(self.customer.as_ref().map(|customer| &customer.id)) + { + api_locking::LockAction::HoldMultiple { + inputs: vec![ + intent_id_locking_input, + api_locking::LockingInput { + unique_locking_key: customer_id.get_string_repr().to_owned(), + api_identifier, + override_lock_retries: None, + }, + ], + } + } else { + api_locking::LockAction::Hold { + input: intent_id_locking_input, + } } } _ => api_locking::LockAction::NotApplicable,