From 21f2ccd47c3627c760ade1b5fe90c3c13a46210e Mon Sep 17 00:00:00 2001 From: Jeeva Date: Wed, 21 Jun 2023 08:40:03 +0100 Subject: [PATCH] feat(router): add route to invalidate cache entry (#1100) Co-authored-by: jeeva Co-authored-by: Sanchith Hegde <22217505+SanchithHegde@users.noreply.github.com> --- crates/drainer/src/utils.rs | 13 ++-- crates/redis_interface/src/commands.rs | 91 ++++++++++++++++++++------ crates/redis_interface/src/errors.rs | 2 +- crates/redis_interface/src/types.rs | 19 ++++++ crates/router/src/cache.rs | 28 ++++++-- crates/router/src/core.rs | 1 + crates/router/src/core/cache.rs | 25 +++++++ crates/router/src/db/queue.rs | 2 +- crates/router/src/lib.rs | 2 + crates/router/src/routes.rs | 7 +- crates/router/src/routes/app.rs | 17 +++-- crates/router/src/routes/cache.rs | 29 ++++++++ crates/router/tests/cache.rs | 76 +++++++++++++++++++++ crates/router_env/src/logger/types.rs | 2 + 14 files changed, 271 insertions(+), 43 deletions(-) create mode 100644 crates/router/src/core/cache.rs create mode 100644 crates/router/src/routes/cache.rs create mode 100644 crates/router/tests/cache.rs diff --git a/crates/drainer/src/utils.rs b/crates/drainer/src/utils.rs index c06615759f..7ba1018595 100644 --- a/crates/drainer/src/utils.rs +++ b/crates/drainer/src/utils.rs @@ -96,11 +96,14 @@ pub async fn make_stream_available( stream_name_flag: &str, redis: &redis::RedisConnectionPool, ) -> errors::DrainerResult<()> { - redis - .delete_key(stream_name_flag) - .await - .map_err(DrainerError::from) - .into_report() + match redis.delete_key(stream_name_flag).await { + Ok(redis::DelReply::KeyDeleted) => Ok(()), + Ok(redis::DelReply::KeyNotDeleted) => { + logger::error!("Tried to unlock a stream which is already unlocked"); + Ok(()) + } + Err(error) => Err(DrainerError::from(error).into()), + } } pub fn parse_stream_entries<'a>( diff --git a/crates/redis_interface/src/commands.rs b/crates/redis_interface/src/commands.rs index 1547158e6e..f88870a982 100644 --- a/crates/redis_interface/src/commands.rs +++ b/crates/redis_interface/src/commands.rs @@ -26,7 +26,7 @@ use router_env::{instrument, logger, tracing}; use crate::{ errors, - types::{HsetnxReply, MsetnxReply, RedisEntryId, SetnxReply}, + types::{DelReply, HsetnxReply, MsetnxReply, RedisEntryId, SetnxReply}, }; impl super::RedisConnectionPool { @@ -148,7 +148,7 @@ impl super::RedisConnectionPool { } #[instrument(level = "DEBUG", skip(self))] - pub async fn delete_key(&self, key: &str) -> CustomResult<(), errors::RedisError> { + pub async fn delete_key(&self, key: &str) -> CustomResult { self.pool .del(key) .await @@ -664,30 +664,81 @@ impl super::RedisConnectionPool { #[cfg(test)] mod tests { - #![allow(clippy::unwrap_used)] + #![allow(clippy::expect_used, clippy::unwrap_used)] use crate::{errors::RedisError, RedisConnectionPool, RedisEntryId, RedisSettings}; #[tokio::test] async fn test_consumer_group_create() { - let redis_conn = RedisConnectionPool::new(&RedisSettings::default()) - .await - .unwrap(); + let is_invalid_redis_entry_error = tokio::task::spawn_blocking(move || { + futures::executor::block_on(async { + // Arrange + let redis_conn = RedisConnectionPool::new(&RedisSettings::default()) + .await + .expect("failed to create redis connection pool"); - let result1 = redis_conn - .consumer_group_create("TEST1", "GTEST", &RedisEntryId::AutoGeneratedID) - .await; - let result2 = redis_conn - .consumer_group_create("TEST3", "GTEST", &RedisEntryId::UndeliveredEntryID) - .await; + // Act + let result1 = redis_conn + .consumer_group_create("TEST1", "GTEST", &RedisEntryId::AutoGeneratedID) + .await; - assert!(matches!( - result1.unwrap_err().current_context(), - RedisError::InvalidRedisEntryId - )); - assert!(matches!( - result2.unwrap_err().current_context(), - RedisError::InvalidRedisEntryId - )); + let result2 = redis_conn + .consumer_group_create("TEST3", "GTEST", &RedisEntryId::UndeliveredEntryID) + .await; + + // Assert Setup + *result1.unwrap_err().current_context() == RedisError::InvalidRedisEntryId + && *result2.unwrap_err().current_context() == RedisError::InvalidRedisEntryId + }) + }) + .await + .expect("Spawn block failure"); + + assert!(is_invalid_redis_entry_error); + } + + #[tokio::test] + async fn test_delete_existing_key_success() { + let is_success = tokio::task::spawn_blocking(move || { + futures::executor::block_on(async { + // Arrange + let pool = RedisConnectionPool::new(&RedisSettings::default()) + .await + .expect("failed to create redis connection pool"); + let _ = pool.set_key("key", "value".to_string()).await; + + // Act + let result = pool.delete_key("key").await; + + // Assert setup + result.is_ok() + }) + }) + .await + .expect("Spawn block failure"); + + assert!(is_success); + } + + #[tokio::test] + async fn test_delete_non_existing_key_success() { + let is_success = tokio::task::spawn_blocking(move || { + futures::executor::block_on(async { + // Arrange + let pool = RedisConnectionPool::new(&RedisSettings::default()) + .await + .expect("failed to create redis connection pool"); + + // Act + let result = pool.delete_key("key not exists").await; + + // Assert Setup + result.is_ok() + }) + }) + .await + .expect("Spawn block failure"); + + assert!(is_success); } } diff --git a/crates/redis_interface/src/errors.rs b/crates/redis_interface/src/errors.rs index b506957b9b..fa11874e60 100644 --- a/crates/redis_interface/src/errors.rs +++ b/crates/redis_interface/src/errors.rs @@ -2,7 +2,7 @@ //! Errors specific to this custom redis interface //! -#[derive(Debug, thiserror::Error)] +#[derive(Debug, thiserror::Error, PartialEq)] pub enum RedisError { #[error("Invalid Redis configuration: {0}")] InvalidConfiguration(String), diff --git a/crates/redis_interface/src/types.rs b/crates/redis_interface/src/types.rs index c5b82ec2f4..5dc9307001 100644 --- a/crates/redis_interface/src/types.rs +++ b/crates/redis_interface/src/types.rs @@ -226,3 +226,22 @@ impl From for fred::types::XCapTrim { } } } + +#[derive(Debug)] +pub enum DelReply { + KeyDeleted, + KeyNotDeleted, // Key not found +} + +impl fred::types::FromRedis for DelReply { + fn from_value(value: fred::types::RedisValue) -> Result { + match value { + fred::types::RedisValue::Integer(1) => Ok(Self::KeyDeleted), + fred::types::RedisValue::Integer(0) => Ok(Self::KeyNotDeleted), + _ => Err(fred::error::RedisError::new( + fred::error::RedisErrorKind::Unknown, + "Unexpected del command reply", + )), + } + } +} diff --git a/crates/router/src/cache.rs b/crates/router/src/cache.rs index a3a4d32596..c8907dad28 100644 --- a/crates/router/src/cache.rs +++ b/crates/router/src/cache.rs @@ -117,6 +117,10 @@ impl Cache { let val = self.get(key)?; (*val).as_any().downcast_ref::().cloned() } + + pub async fn remove(&self, key: &str) { + self.invalidate(key).await; + } } #[cfg(test)] @@ -130,6 +134,23 @@ mod cache_tests { assert_eq!(cache.get_val::("key"), Some(String::from("val"))); } + #[tokio::test] + async fn eviction_on_size_test() { + let cache = Cache::new(2, 2, Some(0)); + cache.push("key".to_string(), "val".to_string()).await; + assert_eq!(cache.get_val::("key"), None); + } + + #[tokio::test] + async fn invalidate_cache_for_key() { + let cache = Cache::new(1800, 1800, None); + cache.push("key".to_string(), "val".to_string()).await; + + cache.remove("key").await; + + assert_eq!(cache.get_val::("key"), None); + } + #[tokio::test] async fn eviction_on_time_test() { let cache = Cache::new(2, 2, None); @@ -137,11 +158,4 @@ mod cache_tests { tokio::time::sleep(std::time::Duration::from_secs(3)).await; assert_eq!(cache.get_val::("key"), None); } - - #[tokio::test] - async fn eviction_on_size_test() { - let cache = Cache::new(2, 2, Some(0)); - cache.push("key".to_string(), "val".to_string()).await; - assert_eq!(cache.get_val::("key"), None); - } } diff --git a/crates/router/src/core.rs b/crates/router/src/core.rs index ada93005f6..3ade4a3346 100644 --- a/crates/router/src/core.rs +++ b/crates/router/src/core.rs @@ -1,5 +1,6 @@ pub mod admin; pub mod api_keys; +pub mod cache; pub mod cards_info; pub mod configs; pub mod customers; diff --git a/crates/router/src/core/cache.rs b/crates/router/src/core/cache.rs new file mode 100644 index 0000000000..b8c3593a7a --- /dev/null +++ b/crates/router/src/core/cache.rs @@ -0,0 +1,25 @@ +use common_utils::errors::CustomResult; +use redis_interface::DelReply; + +use super::errors; +use crate::{ + cache::{ACCOUNTS_CACHE, CONFIG_CACHE}, + db::StorageInterface, + services, +}; + +pub async fn invalidate( + store: &dyn StorageInterface, + key: &str, +) -> CustomResult, errors::ApiErrorResponse> { + CONFIG_CACHE.remove(key).await; + ACCOUNTS_CACHE.remove(key).await; + + match store.get_redis_conn().delete_key(key).await { + Ok(DelReply::KeyDeleted) => Ok(services::api::ApplicationResponse::StatusOk), + Ok(DelReply::KeyNotDeleted) => Err(errors::ApiErrorResponse::InvalidRequestUrl.into()), + Err(error) => Err(error + .change_context(errors::ApiErrorResponse::InternalServerError) + .attach_printable("Failed to invalidate cache")), + } +} diff --git a/crates/router/src/db/queue.rs b/crates/router/src/db/queue.rs index 590dd55918..ff246e1ec2 100644 --- a/crates/router/src/db/queue.rs +++ b/crates/router/src/db/queue.rs @@ -109,7 +109,7 @@ impl QueueInterface for Store { async fn release_pt_lock(&self, tag: &str, lock_key: &str) -> CustomResult { let is_lock_released = self.redis_conn()?.delete_key(lock_key).await; Ok(match is_lock_released { - Ok(()) => true, + Ok(_del_reply) => true, Err(error) => { logger::error!(error=%error.current_context(), %tag, "Error while releasing lock"); false diff --git a/crates/router/src/lib.rs b/crates/router/src/lib.rs index a81a619ceb..34550a3a52 100644 --- a/crates/router/src/lib.rs +++ b/crates/router/src/lib.rs @@ -138,7 +138,9 @@ pub fn mk_app( server_app = server_app.service(routes::StripeApis::server(state.clone())); } server_app = server_app.service(routes::Cards::server(state.clone())); + server_app = server_app.service(routes::Cache::server(state.clone())); server_app = server_app.service(routes::Health::server(state)); + server_app } diff --git a/crates/router/src/routes.rs b/crates/router/src/routes.rs index f97484ca34..8596be380d 100644 --- a/crates/router/src/routes.rs +++ b/crates/router/src/routes.rs @@ -1,6 +1,7 @@ pub mod admin; pub mod api_keys; pub mod app; +pub mod cache; pub mod cards_info; pub mod configs; pub mod customers; @@ -21,9 +22,9 @@ pub mod webhooks; #[cfg(feature = "dummy_connector")] pub use self::app::DummyConnector; pub use self::app::{ - ApiKeys, AppState, Cards, Configs, Customers, Disputes, EphemeralKey, Files, Health, Mandates, - MerchantAccount, MerchantConnectorAccount, PaymentMethods, Payments, Payouts, Refunds, - Webhooks, + ApiKeys, AppState, Cache, Cards, Configs, Customers, Disputes, EphemeralKey, Files, Health, + Mandates, MerchantAccount, MerchantConnectorAccount, PaymentMethods, Payments, Payouts, + Refunds, Webhooks, }; #[cfg(feature = "stripe")] pub use super::compatibility::stripe::StripeApis; diff --git a/crates/router/src/routes/app.rs b/crates/router/src/routes/app.rs index 44cfaa9984..f153850ce8 100644 --- a/crates/router/src/routes/app.rs +++ b/crates/router/src/routes/app.rs @@ -5,9 +5,9 @@ use tokio::sync::oneshot; #[cfg(feature = "dummy_connector")] use super::dummy_connector::*; -use super::health::*; #[cfg(feature = "olap")] use super::{admin::*, api_keys::*, disputes::*, files::*}; +use super::{cache::*, health::*}; #[cfg(any(feature = "olap", feature = "oltp"))] use super::{configs::*, customers::*, mandates::*, payments::*, payouts::*, refunds::*}; #[cfg(feature = "oltp")] @@ -424,7 +424,6 @@ impl Configs { pub fn server(config: AppState) -> Scope { web::scope("/configs") .app_data(web::Data::new(config)) - .service(web::resource("/").route(web::post().to(config_key_create))) .service( web::resource("/{key}") .route(web::get().to(config_key_retrieve)) @@ -465,10 +464,6 @@ impl Disputes { .route(web::post().to(submit_dispute_evidence)) .route(web::put().to(attach_dispute_evidence)), ) - .service( - web::resource("/evidence/{dispute_id}") - .route(web::get().to(retrieve_dispute_evidence)), - ) .service(web::resource("/{dispute_id}").route(web::get().to(retrieve_dispute))) } } @@ -498,3 +493,13 @@ impl Files { ) } } + +pub struct Cache; + +impl Cache { + pub fn server(state: AppState) -> Scope { + web::scope("/cache") + .app_data(web::Data::new(state)) + .service(web::resource("/invalidate/{key}").route(web::post().to(invalidate))) + } +} diff --git a/crates/router/src/routes/cache.rs b/crates/router/src/routes/cache.rs new file mode 100644 index 0000000000..4a54a9bb62 --- /dev/null +++ b/crates/router/src/routes/cache.rs @@ -0,0 +1,29 @@ +use actix_web::{web, HttpRequest, Responder}; +use router_env::{instrument, tracing, Flow}; + +use super::AppState; +use crate::{ + core::cache, + services::{api, authentication as auth}, +}; + +#[instrument(skip_all)] +pub async fn invalidate( + state: web::Data, + req: HttpRequest, + key: web::Path, +) -> impl Responder { + let flow = Flow::CacheInvalidate; + + let key = key.into_inner().to_owned(); + + api::server_wrap( + flow, + state.get_ref(), + &req, + &key, + |state, _, key| cache::invalidate(&*state.store, key), + &auth::AdminApiAuth, + ) + .await +} diff --git a/crates/router/tests/cache.rs b/crates/router/tests/cache.rs new file mode 100644 index 0000000000..e918d0f4c7 --- /dev/null +++ b/crates/router/tests/cache.rs @@ -0,0 +1,76 @@ +#![allow(clippy::unwrap_used)] +use router::{ + cache::{self}, + configs::settings::Settings, + routes, +}; + +mod utils; + +#[actix_web::test] +async fn invalidate_existing_cache_success() { + // Arrange + utils::setup().await; + let (tx, _) = tokio::sync::oneshot::channel(); + let state = routes::AppState::new(Settings::default(), tx).await; + + let cache_key = "cacheKey".to_string(); + let cache_key_value = "val".to_string(); + let _ = state + .store + .get_redis_conn() + .set_key(&cache_key.clone(), cache_key_value.clone()) + .await; + + let api_key = ("api-key", "test_admin"); + let client = awc::Client::default(); + + cache::CONFIG_CACHE + .push(cache_key.clone(), cache_key_value.clone()) + .await; + + cache::ACCOUNTS_CACHE + .push(cache_key.clone(), cache_key_value.clone()) + .await; + + // Act + let mut response = client + .post(format!( + "http://127.0.0.1:8080/cache/invalidate/{cache_key}" + )) + .insert_header(api_key) + .send() + .await + .unwrap(); + + // Assert + let response_body = response.body().await; + println!("invalidate Cache: {response:?} : {response_body:?}"); + assert_eq!(response.status(), awc::http::StatusCode::OK); + assert!(cache::CONFIG_CACHE.get(&cache_key).is_none()); + assert!(cache::ACCOUNTS_CACHE.get(&cache_key).is_none()); +} + +#[actix_web::test] +async fn invalidate_non_existing_cache_success() { + // Arrange + utils::setup().await; + let cache_key = "cacheKey".to_string(); + let api_key = ("api-key", "test_admin"); + let client = awc::Client::default(); + + // Act + let mut response = client + .post(format!( + "http://127.0.0.1:8080/cache/invalidate/{cache_key}" + )) + .insert_header(api_key) + .send() + .await + .unwrap(); + + // Assert + let response_body = response.body().await; + println!("invalidate Cache: {response:?} : {response_body:?}"); + assert_eq!(response.status(), awc::http::StatusCode::NOT_FOUND); +} diff --git a/crates/router_env/src/logger/types.rs b/crates/router_env/src/logger/types.rs index 8ccfb73011..b9fd06876e 100644 --- a/crates/router_env/src/logger/types.rs +++ b/crates/router_env/src/logger/types.rs @@ -184,6 +184,8 @@ pub enum Flow { AttachDisputeEvidence, /// Retrieve Dispute Evidence flow RetrieveDisputeEvidence, + /// Invalidate cache flow + CacheInvalidate, } ///