feat(router): add route to invalidate cache entry (#1100)

Co-authored-by: jeeva <jeeva.ramu@codurance.com>
Co-authored-by: Sanchith Hegde <22217505+SanchithHegde@users.noreply.github.com>
This commit is contained in:
Jeeva
2023-06-21 08:40:03 +01:00
committed by GitHub
parent 2b71d4d8c4
commit 21f2ccd47c
14 changed files with 271 additions and 43 deletions

View File

@ -96,11 +96,14 @@ pub async fn make_stream_available(
stream_name_flag: &str, stream_name_flag: &str,
redis: &redis::RedisConnectionPool, redis: &redis::RedisConnectionPool,
) -> errors::DrainerResult<()> { ) -> errors::DrainerResult<()> {
redis match redis.delete_key(stream_name_flag).await {
.delete_key(stream_name_flag) Ok(redis::DelReply::KeyDeleted) => Ok(()),
.await Ok(redis::DelReply::KeyNotDeleted) => {
.map_err(DrainerError::from) logger::error!("Tried to unlock a stream which is already unlocked");
.into_report() Ok(())
}
Err(error) => Err(DrainerError::from(error).into()),
}
} }
pub fn parse_stream_entries<'a>( pub fn parse_stream_entries<'a>(

View File

@ -26,7 +26,7 @@ use router_env::{instrument, logger, tracing};
use crate::{ use crate::{
errors, errors,
types::{HsetnxReply, MsetnxReply, RedisEntryId, SetnxReply}, types::{DelReply, HsetnxReply, MsetnxReply, RedisEntryId, SetnxReply},
}; };
impl super::RedisConnectionPool { impl super::RedisConnectionPool {
@ -148,7 +148,7 @@ impl super::RedisConnectionPool {
} }
#[instrument(level = "DEBUG", skip(self))] #[instrument(level = "DEBUG", skip(self))]
pub async fn delete_key(&self, key: &str) -> CustomResult<(), errors::RedisError> { pub async fn delete_key(&self, key: &str) -> CustomResult<DelReply, errors::RedisError> {
self.pool self.pool
.del(key) .del(key)
.await .await
@ -664,30 +664,81 @@ impl super::RedisConnectionPool {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
#![allow(clippy::unwrap_used)] #![allow(clippy::expect_used, clippy::unwrap_used)]
use crate::{errors::RedisError, RedisConnectionPool, RedisEntryId, RedisSettings}; use crate::{errors::RedisError, RedisConnectionPool, RedisEntryId, RedisSettings};
#[tokio::test] #[tokio::test]
async fn test_consumer_group_create() { async fn test_consumer_group_create() {
let redis_conn = RedisConnectionPool::new(&RedisSettings::default()) let is_invalid_redis_entry_error = tokio::task::spawn_blocking(move || {
.await futures::executor::block_on(async {
.unwrap(); // Arrange
let redis_conn = RedisConnectionPool::new(&RedisSettings::default())
.await
.expect("failed to create redis connection pool");
let result1 = redis_conn // Act
.consumer_group_create("TEST1", "GTEST", &RedisEntryId::AutoGeneratedID) let result1 = redis_conn
.await; .consumer_group_create("TEST1", "GTEST", &RedisEntryId::AutoGeneratedID)
let result2 = redis_conn .await;
.consumer_group_create("TEST3", "GTEST", &RedisEntryId::UndeliveredEntryID)
.await;
assert!(matches!( let result2 = redis_conn
result1.unwrap_err().current_context(), .consumer_group_create("TEST3", "GTEST", &RedisEntryId::UndeliveredEntryID)
RedisError::InvalidRedisEntryId .await;
));
assert!(matches!( // Assert Setup
result2.unwrap_err().current_context(), *result1.unwrap_err().current_context() == RedisError::InvalidRedisEntryId
RedisError::InvalidRedisEntryId && *result2.unwrap_err().current_context() == RedisError::InvalidRedisEntryId
)); })
})
.await
.expect("Spawn block failure");
assert!(is_invalid_redis_entry_error);
}
#[tokio::test]
async fn test_delete_existing_key_success() {
let is_success = tokio::task::spawn_blocking(move || {
futures::executor::block_on(async {
// Arrange
let pool = RedisConnectionPool::new(&RedisSettings::default())
.await
.expect("failed to create redis connection pool");
let _ = pool.set_key("key", "value".to_string()).await;
// Act
let result = pool.delete_key("key").await;
// Assert setup
result.is_ok()
})
})
.await
.expect("Spawn block failure");
assert!(is_success);
}
#[tokio::test]
async fn test_delete_non_existing_key_success() {
let is_success = tokio::task::spawn_blocking(move || {
futures::executor::block_on(async {
// Arrange
let pool = RedisConnectionPool::new(&RedisSettings::default())
.await
.expect("failed to create redis connection pool");
// Act
let result = pool.delete_key("key not exists").await;
// Assert Setup
result.is_ok()
})
})
.await
.expect("Spawn block failure");
assert!(is_success);
} }
} }

View File

@ -2,7 +2,7 @@
//! Errors specific to this custom redis interface //! Errors specific to this custom redis interface
//! //!
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error, PartialEq)]
pub enum RedisError { pub enum RedisError {
#[error("Invalid Redis configuration: {0}")] #[error("Invalid Redis configuration: {0}")]
InvalidConfiguration(String), InvalidConfiguration(String),

View File

@ -226,3 +226,22 @@ impl From<StreamCapTrim> for fred::types::XCapTrim {
} }
} }
} }
#[derive(Debug)]
pub enum DelReply {
KeyDeleted,
KeyNotDeleted, // Key not found
}
impl fred::types::FromRedis for DelReply {
fn from_value(value: fred::types::RedisValue) -> Result<Self, fred::error::RedisError> {
match value {
fred::types::RedisValue::Integer(1) => Ok(Self::KeyDeleted),
fred::types::RedisValue::Integer(0) => Ok(Self::KeyNotDeleted),
_ => Err(fred::error::RedisError::new(
fred::error::RedisErrorKind::Unknown,
"Unexpected del command reply",
)),
}
}
}

View File

@ -117,6 +117,10 @@ impl Cache {
let val = self.get(key)?; let val = self.get(key)?;
(*val).as_any().downcast_ref::<T>().cloned() (*val).as_any().downcast_ref::<T>().cloned()
} }
pub async fn remove(&self, key: &str) {
self.invalidate(key).await;
}
} }
#[cfg(test)] #[cfg(test)]
@ -130,6 +134,23 @@ mod cache_tests {
assert_eq!(cache.get_val::<String>("key"), Some(String::from("val"))); assert_eq!(cache.get_val::<String>("key"), Some(String::from("val")));
} }
#[tokio::test]
async fn eviction_on_size_test() {
let cache = Cache::new(2, 2, Some(0));
cache.push("key".to_string(), "val".to_string()).await;
assert_eq!(cache.get_val::<String>("key"), None);
}
#[tokio::test]
async fn invalidate_cache_for_key() {
let cache = Cache::new(1800, 1800, None);
cache.push("key".to_string(), "val".to_string()).await;
cache.remove("key").await;
assert_eq!(cache.get_val::<String>("key"), None);
}
#[tokio::test] #[tokio::test]
async fn eviction_on_time_test() { async fn eviction_on_time_test() {
let cache = Cache::new(2, 2, None); let cache = Cache::new(2, 2, None);
@ -137,11 +158,4 @@ mod cache_tests {
tokio::time::sleep(std::time::Duration::from_secs(3)).await; tokio::time::sleep(std::time::Duration::from_secs(3)).await;
assert_eq!(cache.get_val::<String>("key"), None); assert_eq!(cache.get_val::<String>("key"), None);
} }
#[tokio::test]
async fn eviction_on_size_test() {
let cache = Cache::new(2, 2, Some(0));
cache.push("key".to_string(), "val".to_string()).await;
assert_eq!(cache.get_val::<String>("key"), None);
}
} }

View File

@ -1,5 +1,6 @@
pub mod admin; pub mod admin;
pub mod api_keys; pub mod api_keys;
pub mod cache;
pub mod cards_info; pub mod cards_info;
pub mod configs; pub mod configs;
pub mod customers; pub mod customers;

View File

@ -0,0 +1,25 @@
use common_utils::errors::CustomResult;
use redis_interface::DelReply;
use super::errors;
use crate::{
cache::{ACCOUNTS_CACHE, CONFIG_CACHE},
db::StorageInterface,
services,
};
pub async fn invalidate(
store: &dyn StorageInterface,
key: &str,
) -> CustomResult<services::api::ApplicationResponse<serde_json::Value>, errors::ApiErrorResponse> {
CONFIG_CACHE.remove(key).await;
ACCOUNTS_CACHE.remove(key).await;
match store.get_redis_conn().delete_key(key).await {
Ok(DelReply::KeyDeleted) => Ok(services::api::ApplicationResponse::StatusOk),
Ok(DelReply::KeyNotDeleted) => Err(errors::ApiErrorResponse::InvalidRequestUrl.into()),
Err(error) => Err(error
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable("Failed to invalidate cache")),
}
}

View File

@ -109,7 +109,7 @@ impl QueueInterface for Store {
async fn release_pt_lock(&self, tag: &str, lock_key: &str) -> CustomResult<bool, RedisError> { async fn release_pt_lock(&self, tag: &str, lock_key: &str) -> CustomResult<bool, RedisError> {
let is_lock_released = self.redis_conn()?.delete_key(lock_key).await; let is_lock_released = self.redis_conn()?.delete_key(lock_key).await;
Ok(match is_lock_released { Ok(match is_lock_released {
Ok(()) => true, Ok(_del_reply) => true,
Err(error) => { Err(error) => {
logger::error!(error=%error.current_context(), %tag, "Error while releasing lock"); logger::error!(error=%error.current_context(), %tag, "Error while releasing lock");
false false

View File

@ -138,7 +138,9 @@ pub fn mk_app(
server_app = server_app.service(routes::StripeApis::server(state.clone())); server_app = server_app.service(routes::StripeApis::server(state.clone()));
} }
server_app = server_app.service(routes::Cards::server(state.clone())); server_app = server_app.service(routes::Cards::server(state.clone()));
server_app = server_app.service(routes::Cache::server(state.clone()));
server_app = server_app.service(routes::Health::server(state)); server_app = server_app.service(routes::Health::server(state));
server_app server_app
} }

View File

@ -1,6 +1,7 @@
pub mod admin; pub mod admin;
pub mod api_keys; pub mod api_keys;
pub mod app; pub mod app;
pub mod cache;
pub mod cards_info; pub mod cards_info;
pub mod configs; pub mod configs;
pub mod customers; pub mod customers;
@ -21,9 +22,9 @@ pub mod webhooks;
#[cfg(feature = "dummy_connector")] #[cfg(feature = "dummy_connector")]
pub use self::app::DummyConnector; pub use self::app::DummyConnector;
pub use self::app::{ pub use self::app::{
ApiKeys, AppState, Cards, Configs, Customers, Disputes, EphemeralKey, Files, Health, Mandates, ApiKeys, AppState, Cache, Cards, Configs, Customers, Disputes, EphemeralKey, Files, Health,
MerchantAccount, MerchantConnectorAccount, PaymentMethods, Payments, Payouts, Refunds, Mandates, MerchantAccount, MerchantConnectorAccount, PaymentMethods, Payments, Payouts,
Webhooks, Refunds, Webhooks,
}; };
#[cfg(feature = "stripe")] #[cfg(feature = "stripe")]
pub use super::compatibility::stripe::StripeApis; pub use super::compatibility::stripe::StripeApis;

View File

@ -5,9 +5,9 @@ use tokio::sync::oneshot;
#[cfg(feature = "dummy_connector")] #[cfg(feature = "dummy_connector")]
use super::dummy_connector::*; use super::dummy_connector::*;
use super::health::*;
#[cfg(feature = "olap")] #[cfg(feature = "olap")]
use super::{admin::*, api_keys::*, disputes::*, files::*}; use super::{admin::*, api_keys::*, disputes::*, files::*};
use super::{cache::*, health::*};
#[cfg(any(feature = "olap", feature = "oltp"))] #[cfg(any(feature = "olap", feature = "oltp"))]
use super::{configs::*, customers::*, mandates::*, payments::*, payouts::*, refunds::*}; use super::{configs::*, customers::*, mandates::*, payments::*, payouts::*, refunds::*};
#[cfg(feature = "oltp")] #[cfg(feature = "oltp")]
@ -424,7 +424,6 @@ impl Configs {
pub fn server(config: AppState) -> Scope { pub fn server(config: AppState) -> Scope {
web::scope("/configs") web::scope("/configs")
.app_data(web::Data::new(config)) .app_data(web::Data::new(config))
.service(web::resource("/").route(web::post().to(config_key_create)))
.service( .service(
web::resource("/{key}") web::resource("/{key}")
.route(web::get().to(config_key_retrieve)) .route(web::get().to(config_key_retrieve))
@ -465,10 +464,6 @@ impl Disputes {
.route(web::post().to(submit_dispute_evidence)) .route(web::post().to(submit_dispute_evidence))
.route(web::put().to(attach_dispute_evidence)), .route(web::put().to(attach_dispute_evidence)),
) )
.service(
web::resource("/evidence/{dispute_id}")
.route(web::get().to(retrieve_dispute_evidence)),
)
.service(web::resource("/{dispute_id}").route(web::get().to(retrieve_dispute))) .service(web::resource("/{dispute_id}").route(web::get().to(retrieve_dispute)))
} }
} }
@ -498,3 +493,13 @@ impl Files {
) )
} }
} }
pub struct Cache;
impl Cache {
pub fn server(state: AppState) -> Scope {
web::scope("/cache")
.app_data(web::Data::new(state))
.service(web::resource("/invalidate/{key}").route(web::post().to(invalidate)))
}
}

View File

@ -0,0 +1,29 @@
use actix_web::{web, HttpRequest, Responder};
use router_env::{instrument, tracing, Flow};
use super::AppState;
use crate::{
core::cache,
services::{api, authentication as auth},
};
#[instrument(skip_all)]
pub async fn invalidate(
state: web::Data<AppState>,
req: HttpRequest,
key: web::Path<String>,
) -> impl Responder {
let flow = Flow::CacheInvalidate;
let key = key.into_inner().to_owned();
api::server_wrap(
flow,
state.get_ref(),
&req,
&key,
|state, _, key| cache::invalidate(&*state.store, key),
&auth::AdminApiAuth,
)
.await
}

View File

@ -0,0 +1,76 @@
#![allow(clippy::unwrap_used)]
use router::{
cache::{self},
configs::settings::Settings,
routes,
};
mod utils;
#[actix_web::test]
async fn invalidate_existing_cache_success() {
// Arrange
utils::setup().await;
let (tx, _) = tokio::sync::oneshot::channel();
let state = routes::AppState::new(Settings::default(), tx).await;
let cache_key = "cacheKey".to_string();
let cache_key_value = "val".to_string();
let _ = state
.store
.get_redis_conn()
.set_key(&cache_key.clone(), cache_key_value.clone())
.await;
let api_key = ("api-key", "test_admin");
let client = awc::Client::default();
cache::CONFIG_CACHE
.push(cache_key.clone(), cache_key_value.clone())
.await;
cache::ACCOUNTS_CACHE
.push(cache_key.clone(), cache_key_value.clone())
.await;
// Act
let mut response = client
.post(format!(
"http://127.0.0.1:8080/cache/invalidate/{cache_key}"
))
.insert_header(api_key)
.send()
.await
.unwrap();
// Assert
let response_body = response.body().await;
println!("invalidate Cache: {response:?} : {response_body:?}");
assert_eq!(response.status(), awc::http::StatusCode::OK);
assert!(cache::CONFIG_CACHE.get(&cache_key).is_none());
assert!(cache::ACCOUNTS_CACHE.get(&cache_key).is_none());
}
#[actix_web::test]
async fn invalidate_non_existing_cache_success() {
// Arrange
utils::setup().await;
let cache_key = "cacheKey".to_string();
let api_key = ("api-key", "test_admin");
let client = awc::Client::default();
// Act
let mut response = client
.post(format!(
"http://127.0.0.1:8080/cache/invalidate/{cache_key}"
))
.insert_header(api_key)
.send()
.await
.unwrap();
// Assert
let response_body = response.body().await;
println!("invalidate Cache: {response:?} : {response_body:?}");
assert_eq!(response.status(), awc::http::StatusCode::NOT_FOUND);
}

View File

@ -184,6 +184,8 @@ pub enum Flow {
AttachDisputeEvidence, AttachDisputeEvidence,
/// Retrieve Dispute Evidence flow /// Retrieve Dispute Evidence flow
RetrieveDisputeEvidence, RetrieveDisputeEvidence,
/// Invalidate cache flow
CacheInvalidate,
} }
/// ///