diff --git a/crates/router/src/consts.rs b/crates/router/src/consts.rs index 6d9da8f7bb..610bc7ca28 100644 --- a/crates/router/src/consts.rs +++ b/crates/router/src/consts.rs @@ -50,7 +50,6 @@ pub(crate) const BASE64_ENGINE_URL_SAFE: base64::engine::GeneralPurpose = base64::engine::general_purpose::URL_SAFE; pub(crate) const API_KEY_LENGTH: usize = 64; -pub(crate) const PUB_SUB_CHANNEL: &str = "hyperswitch_invalidate"; // Apple Pay validation url pub(crate) const APPLEPAY_VALIDATION_URL: &str = diff --git a/crates/router/src/core/cache.rs b/crates/router/src/core/cache.rs index a8ca8395a6..8f3cf765e9 100644 --- a/crates/router/src/core/cache.rs +++ b/crates/router/src/core/cache.rs @@ -1,18 +1,21 @@ use common_utils::errors::CustomResult; use error_stack::{report, ResultExt}; -use storage_impl::redis::cache::CacheKind; +use storage_impl::redis::cache::{publish_into_redact_channel, CacheKind}; use super::errors; -use crate::{db::cache::publish_into_redact_channel, routes::AppState, services}; +use crate::{routes::AppState, services}; pub async fn invalidate( state: AppState, key: &str, ) -> CustomResult, errors::ApiErrorResponse> { let store = state.store.as_ref(); - let result = publish_into_redact_channel(store, [CacheKind::All(key.into())]) - .await - .change_context(errors::ApiErrorResponse::InternalServerError)?; + let result = publish_into_redact_channel( + store.get_cache_store().as_ref(), + [CacheKind::All(key.into())], + ) + .await + .change_context(errors::ApiErrorResponse::InternalServerError)?; // If the message was published to atleast one channel // then return status Ok diff --git a/crates/router/src/core/payment_methods/surcharge_decision_configs.rs b/crates/router/src/core/payment_methods/surcharge_decision_configs.rs index 88ae82cad3..76881950e3 100644 --- a/crates/router/src/core/payment_methods/surcharge_decision_configs.rs +++ b/crates/router/src/core/payment_methods/surcharge_decision_configs.rs @@ -13,27 +13,26 @@ use euclid::{ }; use router_env::{instrument, tracing}; -use crate::{ - core::payments::{types, PaymentData}, - db::StorageInterface, - types::{ - storage::{self as oss_storage, payment_attempt::PaymentAttemptExt}, - transformers::ForeignTryFrom, - }, -}; -static CONF_CACHE: StaticCache = StaticCache::new(); use crate::{ core::{ errors::ConditionalConfigError as ConfigError, payments::{ conditional_configs::ConditionalConfigResult, routing::make_dsl_input_for_surcharge, + types, PaymentData, }, }, + db::StorageInterface, + types::{ + storage::{self, payment_attempt::PaymentAttemptExt}, + transformers::ForeignTryFrom, + }, AppState, }; +static CONF_CACHE: StaticCache = StaticCache::new(); + struct VirInterpreterBackendCacheWrapper { - cached_alogorith: backend::VirInterpreterBackend, + cached_algorithm: backend::VirInterpreterBackend, merchant_surcharge_configs: surcharge_decision_configs::MerchantSurchargeConfigs, } @@ -41,12 +40,12 @@ impl TryFrom for VirInterpreterBackendCacheWrapp type Error = error_stack::Report; fn try_from(value: SurchargeDecisionManagerRecord) -> Result { - let cached_alogorith = backend::VirInterpreterBackend::with_program(value.algorithm) + let cached_algorithm = backend::VirInterpreterBackend::with_program(value.algorithm) .change_context(ConfigError::DslBackendInitError) .attach_printable("Error initializing DSL interpreter backend")?; let merchant_surcharge_configs = value.merchant_surcharge_configs; Ok(Self { - cached_alogorith, + cached_algorithm, merchant_surcharge_configs, }) } @@ -63,14 +62,14 @@ impl SurchargeSource { pub fn generate_surcharge_details_and_populate_surcharge_metadata( &self, backend_input: &backend::BackendInput, - payment_attempt: &oss_storage::PaymentAttempt, + payment_attempt: &storage::PaymentAttempt, surcharge_metadata_and_key: (&mut types::SurchargeMetadata, types::SurchargeKey), ) -> ConditionalConfigResult> { match self { Self::Generate(interpreter) => { let surcharge_output = execute_dsl_and_get_conditional_config( backend_input.clone(), - &interpreter.cached_alogorith, + &interpreter.cached_algorithm, )?; Ok(surcharge_output .surcharge_details @@ -98,8 +97,8 @@ impl SurchargeSource { pub async fn perform_surcharge_decision_management_for_payment_method_list( state: &AppState, algorithm_ref: routing::RoutingAlgorithmRef, - payment_attempt: &oss_storage::PaymentAttempt, - payment_intent: &oss_storage::PaymentIntent, + payment_attempt: &storage::PaymentAttempt, + payment_intent: &storage::PaymentIntent, billing_address: Option, response_payment_method_types: &mut [api_models::payment_methods::ResponsePaymentMethodsEnabled], ) -> ConditionalConfigResult<( @@ -279,8 +278,8 @@ where pub async fn perform_surcharge_decision_management_for_saved_cards( state: &AppState, algorithm_ref: routing::RoutingAlgorithmRef, - payment_attempt: &oss_storage::PaymentAttempt, - payment_intent: &oss_storage::PaymentIntent, + payment_attempt: &storage::PaymentAttempt, + payment_intent: &storage::PaymentIntent, customer_payment_method_list: &mut [api_models::payment_methods::CustomerPaymentMethod], ) -> ConditionalConfigResult { let mut surcharge_metadata = types::SurchargeMetadata::new(payment_attempt.attempt_id.clone()); @@ -348,7 +347,7 @@ pub async fn perform_surcharge_decision_management_for_saved_cards( fn get_surcharge_details_from_surcharge_output( surcharge_details: surcharge_decision_configs::SurchargeDetailsOutput, - payment_attempt: &oss_storage::PaymentAttempt, + payment_attempt: &storage::PaymentAttempt, ) -> ConditionalConfigResult { let surcharge_amount = match surcharge_details.surcharge.clone() { surcharge_decision_configs::SurchargeOutput::Fixed { amount } => amount, diff --git a/crates/router/src/core/routing/helpers.rs b/crates/router/src/core/routing/helpers.rs index 1d46e54bcc..2e1032e8e4 100644 --- a/crates/router/src/core/routing/helpers.rs +++ b/crates/router/src/core/routing/helpers.rs @@ -10,11 +10,11 @@ use diesel_models::{ }; use error_stack::ResultExt; use rustc_hash::FxHashSet; -use storage_impl::redis::cache as redis_cache; +use storage_impl::redis::cache; use crate::{ core::errors::{self, RouterResult}, - db::{cache, StorageInterface}, + db::StorageInterface, types::{domain, storage}, utils::StringExt, }; @@ -245,12 +245,11 @@ pub async fn update_business_profile_active_algorithm_ref( #[cfg(feature = "business_profile_routing")] let profile_id = current_business_profile.profile_id.clone(); #[cfg(feature = "business_profile_routing")] - let routing_cache_key = redis_cache::CacheKind::Routing( - format!("routing_config_{merchant_id}_{profile_id}").into(), - ); + let routing_cache_key = + cache::CacheKind::Routing(format!("routing_config_{merchant_id}_{profile_id}").into()); #[cfg(not(feature = "business_profile_routing"))] - let routing_cache_key = redis_cache::CacheKind::Routing(format!("dsl_{merchant_id}").into()); + let routing_cache_key = cache::CacheKind::Routing(format!("dsl_{merchant_id}").into()); let (routing_algorithm, payout_routing_algorithm) = match transaction_type { storage::enums::TransactionType::Payment => (Some(ref_val), None), #[cfg(feature = "payouts")] @@ -285,7 +284,7 @@ pub async fn update_business_profile_active_algorithm_ref( .change_context(errors::ApiErrorResponse::InternalServerError) .attach_printable("Failed to update routing algorithm ref in business profile")?; - cache::publish_into_redact_channel(db, [routing_cache_key]) + cache::publish_into_redact_channel(db.get_cache_store().as_ref(), [routing_cache_key]) .await .change_context(errors::ApiErrorResponse::InternalServerError) .attach_printable("Failed to invalidate routing cache")?; diff --git a/crates/router/src/db.rs b/crates/router/src/db.rs index 6ba5b601a3..e0d7d999fe 100644 --- a/crates/router/src/db.rs +++ b/crates/router/src/db.rs @@ -6,7 +6,6 @@ pub mod blocklist; pub mod blocklist_fingerprint; pub mod blocklist_lookup; pub mod business_profile; -pub mod cache; pub mod capture; pub mod cards_info; pub mod configs; @@ -124,6 +123,8 @@ pub trait StorageInterface: + 'static { fn get_scheduler_db(&self) -> Box; + + fn get_cache_store(&self) -> Box<(dyn RedisConnInterface + Send + Sync + 'static)>; } pub trait MasterKeyInterface { @@ -151,6 +152,10 @@ impl StorageInterface for Store { fn get_scheduler_db(&self) -> Box { Box::new(self.clone()) } + + fn get_cache_store(&self) -> Box<(dyn RedisConnInterface + Send + Sync + 'static)> { + Box::new(self.clone()) + } } #[async_trait::async_trait] @@ -158,6 +163,10 @@ impl StorageInterface for MockDb { fn get_scheduler_db(&self) -> Box { Box::new(self.clone()) } + + fn get_cache_store(&self) -> Box<(dyn RedisConnInterface + Send + Sync + 'static)> { + Box::new(self.clone()) + } } pub trait RequestIdStore { diff --git a/crates/router/src/db/api_keys.rs b/crates/router/src/db/api_keys.rs index a23b3f0f7a..5d2f1c57a2 100644 --- a/crates/router/src/db/api_keys.rs +++ b/crates/router/src/db/api_keys.rs @@ -1,9 +1,7 @@ use error_stack::report; use router_env::{instrument, tracing}; #[cfg(feature = "accounts_cache")] -use storage_impl::redis::cache::CacheKind; -#[cfg(feature = "accounts_cache")] -use storage_impl::redis::cache::ACCOUNTS_CACHE; +use storage_impl::redis::cache::{self, CacheKind, ACCOUNTS_CACHE}; use super::{MockDb, Store}; use crate::{ @@ -104,7 +102,7 @@ impl ApiKeyInterface for Store { "ApiKey of {_key_id} not found" ))))?; - super::cache::publish_and_redact( + cache::publish_and_redact( self, CacheKind::Accounts(api_key.hashed_api_key.into_inner().into()), update_call, @@ -146,7 +144,7 @@ impl ApiKeyInterface for Store { "ApiKey of {key_id} not found" ))))?; - super::cache::publish_and_redact( + cache::publish_and_redact( self, CacheKind::Accounts(api_key.hashed_api_key.into_inner().into()), delete_call, @@ -187,7 +185,7 @@ impl ApiKeyInterface for Store { #[cfg(feature = "accounts_cache")] { - super::cache::get_or_populate_in_memory( + cache::get_or_populate_in_memory( self, &_hashed_api_key.into_inner(), find_call, @@ -375,14 +373,14 @@ impl ApiKeyInterface for MockDb { #[cfg(test)] mod tests { use storage_impl::redis::{ - cache::{CacheKind, ACCOUNTS_CACHE}, + cache::{self, CacheKind, ACCOUNTS_CACHE}, kv_store::RedisConnInterface, pub_sub::PubSubInterface, }; use time::macros::datetime; use crate::{ - db::{api_keys::ApiKeyInterface, cache, MockDb}, + db::{api_keys::ApiKeyInterface, MockDb}, types::storage, }; diff --git a/crates/router/src/db/cache.rs b/crates/router/src/db/cache.rs deleted file mode 100644 index ba0aae55e8..0000000000 --- a/crates/router/src/db/cache.rs +++ /dev/null @@ -1,162 +0,0 @@ -use common_utils::ext_traits::AsyncExt; -use error_stack::ResultExt; -use redis_interface::errors::RedisError; -use router_env::{instrument, tracing}; -use storage_impl::redis::{ - cache::{Cache, CacheKind, Cacheable}, - pub_sub::PubSubInterface, -}; - -use super::StorageInterface; -use crate::{ - consts, - core::errors::{self, CustomResult}, -}; - -#[instrument(skip_all)] -pub async fn get_or_populate_redis( - store: &dyn StorageInterface, - key: impl AsRef, - fun: F, -) -> CustomResult -where - T: serde::Serialize + serde::de::DeserializeOwned + std::fmt::Debug, - F: FnOnce() -> Fut + Send, - Fut: futures::Future> + Send, -{ - let type_name = std::any::type_name::(); - let key = key.as_ref(); - let redis = &store - .get_redis_conn() - .change_context(errors::StorageError::RedisError( - RedisError::RedisConnectionError.into(), - )) - .attach_printable("Failed to get redis connection")?; - let redis_val = redis.get_and_deserialize_key::(key, type_name).await; - let get_data_set_redis = || async { - let data = fun().await?; - redis - .serialize_and_set_key(key, &data) - .await - .change_context(errors::StorageError::KVError)?; - Ok::<_, error_stack::Report>(data) - }; - match redis_val { - Err(err) => match err.current_context() { - RedisError::NotFound | RedisError::JsonDeserializationFailed => { - get_data_set_redis().await - } - _ => Err(err - .change_context(errors::StorageError::KVError) - .attach_printable(format!("Error while fetching cache for {type_name}"))), - }, - Ok(val) => Ok(val), - } -} - -#[instrument(skip_all)] -pub async fn get_or_populate_in_memory( - store: &dyn StorageInterface, - key: &str, - fun: F, - cache: &Cache, -) -> CustomResult -where - T: Cacheable + serde::Serialize + serde::de::DeserializeOwned + std::fmt::Debug + Clone, - F: FnOnce() -> Fut + Send, - Fut: futures::Future> + Send, -{ - let cache_val = cache.get_val::(key).await; - if let Some(val) = cache_val { - Ok(val) - } else { - let val = get_or_populate_redis(store, key, fun).await?; - cache.push(key.to_string(), val.clone()).await; - Ok(val) - } -} - -#[instrument(skip_all)] -pub async fn redact_cache( - store: &dyn StorageInterface, - key: &str, - fun: F, - in_memory: Option<&Cache>, -) -> CustomResult -where - F: FnOnce() -> Fut + Send, - Fut: futures::Future> + Send, -{ - let data = fun().await?; - in_memory.async_map(|cache| cache.remove(key)).await; - - let redis_conn = store - .get_redis_conn() - .change_context(errors::StorageError::RedisError( - RedisError::RedisConnectionError.into(), - )) - .attach_printable("Failed to get redis connection")?; - - redis_conn - .delete_key(key) - .await - .change_context(errors::StorageError::KVError)?; - Ok(data) -} - -#[instrument(skip_all)] -pub async fn publish_into_redact_channel<'a, K: IntoIterator> + Send>( - store: &dyn StorageInterface, - keys: K, -) -> CustomResult { - let redis_conn = store - .get_redis_conn() - .change_context(errors::StorageError::RedisError( - RedisError::RedisConnectionError.into(), - )) - .attach_printable("Failed to get redis connection")?; - - let futures = keys.into_iter().map(|key| async { - redis_conn - .clone() - .publish(consts::PUB_SUB_CHANNEL, key) - .await - .change_context(errors::StorageError::KVError) - }); - - Ok(futures::future::try_join_all(futures) - .await? - .iter() - .sum::()) -} - -#[instrument(skip_all)] -pub async fn publish_and_redact<'a, T, F, Fut>( - store: &dyn StorageInterface, - key: CacheKind<'a>, - fun: F, -) -> CustomResult -where - F: FnOnce() -> Fut + Send, - Fut: futures::Future> + Send, -{ - let data = fun().await?; - publish_into_redact_channel(store, [key]).await?; - Ok(data) -} - -#[instrument(skip_all)] -pub async fn publish_and_redact_multiple<'a, T, F, Fut, K>( - store: &dyn StorageInterface, - keys: K, - fun: F, -) -> CustomResult -where - F: FnOnce() -> Fut + Send, - Fut: futures::Future> + Send, - K: IntoIterator> + Send, -{ - let data = fun().await?; - publish_into_redact_channel(store, keys).await?; - Ok(data) -} diff --git a/crates/router/src/db/configs.rs b/crates/router/src/db/configs.rs index 63c90ac8ca..fb399a7084 100644 --- a/crates/router/src/db/configs.rs +++ b/crates/router/src/db/configs.rs @@ -2,14 +2,14 @@ use diesel_models::configs::ConfigUpdateInternal; use error_stack::{report, ResultExt}; use router_env::{instrument, tracing}; use storage_impl::redis::{ - cache::{CacheKind, CONFIG_CACHE}, + cache::{self, CacheKind, CONFIG_CACHE}, kv_store::RedisConnInterface, pub_sub::PubSubInterface, }; -use super::{cache, MockDb, Store}; +use super::{MockDb, Store}; use crate::{ - connection, consts, + connection, core::errors::{self, CustomResult}, types::storage, }; @@ -72,7 +72,7 @@ impl ConfigInterface for Store { self.get_redis_conn() .map_err(Into::::into)? .publish( - consts::PUB_SUB_CHANNEL, + cache::PUB_SUB_CHANNEL, CacheKind::Config((&inserted.key).into()), ) .await @@ -179,7 +179,7 @@ impl ConfigInterface for Store { self.get_redis_conn() .map_err(Into::::into)? - .publish(consts::PUB_SUB_CHANNEL, CacheKind::Config(key.into())) + .publish(cache::PUB_SUB_CHANNEL, CacheKind::Config(key.into())) .await .map_err(Into::::into)?; diff --git a/crates/router/src/db/kafka_store.rs b/crates/router/src/db/kafka_store.rs index 88579892d2..a2aed29236 100644 --- a/crates/router/src/db/kafka_store.rs +++ b/crates/router/src/db/kafka_store.rs @@ -2285,6 +2285,10 @@ impl StorageInterface for KafkaStore { fn get_scheduler_db(&self) -> Box { Box::new(self.clone()) } + + fn get_cache_store(&self) -> Box<(dyn RedisConnInterface + Send + Sync + 'static)> { + Box::new(self.clone()) + } } #[async_trait::async_trait] diff --git a/crates/router/src/db/merchant_account.rs b/crates/router/src/db/merchant_account.rs index 0cecb5e8d7..9b82ef414c 100644 --- a/crates/router/src/db/merchant_account.rs +++ b/crates/router/src/db/merchant_account.rs @@ -6,7 +6,7 @@ use diesel_models::MerchantAccountUpdateInternal; use error_stack::{report, ResultExt}; use router_env::{instrument, tracing}; #[cfg(feature = "accounts_cache")] -use storage_impl::redis::cache::{CacheKind, ACCOUNTS_CACHE}; +use storage_impl::redis::cache::{self, CacheKind, ACCOUNTS_CACHE}; use super::{MasterKeyInterface, MockDb, Store}; use crate::{ @@ -128,7 +128,7 @@ impl MerchantAccountInterface for Store { #[cfg(feature = "accounts_cache")] { - super::cache::get_or_populate_in_memory(self, merchant_id, fetch_func, &ACCOUNTS_CACHE) + cache::get_or_populate_in_memory(self, merchant_id, fetch_func, &ACCOUNTS_CACHE) .await? .convert(merchant_key_store.key.get_inner()) .await @@ -209,7 +209,7 @@ impl MerchantAccountInterface for Store { #[cfg(feature = "accounts_cache")] { - merchant_account = super::cache::get_or_populate_in_memory( + merchant_account = cache::get_or_populate_in_memory( self, publishable_key, fetch_by_pub_key_func, @@ -537,7 +537,7 @@ async fn publish_and_redact_merchant_account_cache( cache_keys.extend(publishable_key.into_iter()); cache_keys.extend(kgraph_key.into_iter()); - super::cache::publish_into_redact_channel(store, cache_keys).await?; + cache::publish_into_redact_channel(store.get_cache_store().as_ref(), cache_keys).await?; Ok(()) } @@ -556,6 +556,6 @@ async fn publish_and_redact_all_merchant_account_cache( .map(|s| CacheKind::Accounts(s.into())) .collect(); - super::cache::publish_into_redact_channel(store, cache_keys).await?; + cache::publish_into_redact_channel(store.get_cache_store().as_ref(), cache_keys).await?; Ok(()) } diff --git a/crates/router/src/db/merchant_connector_account.rs b/crates/router/src/db/merchant_connector_account.rs index 72199c20e0..be37fd9fd6 100644 --- a/crates/router/src/db/merchant_connector_account.rs +++ b/crates/router/src/db/merchant_connector_account.rs @@ -203,7 +203,7 @@ impl MerchantConnectorAccountInterface for Store { #[cfg(feature = "accounts_cache")] { - super::cache::get_or_populate_in_memory( + cache::get_or_populate_in_memory( self, &format!("{}_{}", merchant_id, connector_label), find_call, @@ -248,7 +248,7 @@ impl MerchantConnectorAccountInterface for Store { #[cfg(feature = "accounts_cache")] { - super::cache::get_or_populate_in_memory( + cache::get_or_populate_in_memory( self, &format!("{}_{}", profile_id, connector_name), find_call, @@ -322,7 +322,7 @@ impl MerchantConnectorAccountInterface for Store { #[cfg(feature = "accounts_cache")] { - super::cache::get_or_populate_in_memory( + cache::get_or_populate_in_memory( self, &format!("{}_{}", merchant_id, merchant_connector_id), find_call, @@ -418,7 +418,7 @@ impl MerchantConnectorAccountInterface for Store { #[cfg(feature = "accounts_cache")] { // Redact both the caches as any one or both might be used because of backwards compatibility - super::cache::publish_and_redact_multiple( + cache::publish_and_redact_multiple( self, [ cache::CacheKind::Accounts( @@ -481,7 +481,7 @@ impl MerchantConnectorAccountInterface for Store { "profile_id".to_string(), ))?; - super::cache::publish_and_redact_multiple( + cache::publish_and_redact_multiple( self, [ cache::CacheKind::Accounts( @@ -766,6 +766,7 @@ impl MerchantConnectorAccountInterface for MockDb { } } +#[cfg(feature = "accounts_cache")] #[cfg(test)] mod merchant_connector_account_cache_tests { use api_models::enums::CountryAlpha2; @@ -774,7 +775,7 @@ mod merchant_connector_account_cache_tests { use error_stack::ResultExt; use masking::PeekInterface; use storage_impl::redis::{ - cache::{CacheKind, ACCOUNTS_CACHE}, + cache::{self, CacheKind, ACCOUNTS_CACHE}, kv_store::RedisConnInterface, pub_sub::PubSubInterface, }; @@ -783,7 +784,7 @@ mod merchant_connector_account_cache_tests { use crate::{ core::errors, db::{ - cache, merchant_connector_account::MerchantConnectorAccountInterface, + merchant_connector_account::MerchantConnectorAccountInterface, merchant_key_store::MerchantKeyStoreInterface, MasterKeyInterface, MockDb, }, services, diff --git a/crates/router/src/db/merchant_key_store.rs b/crates/router/src/db/merchant_key_store.rs index f857ab9d61..77eb260983 100644 --- a/crates/router/src/db/merchant_key_store.rs +++ b/crates/router/src/db/merchant_key_store.rs @@ -2,7 +2,7 @@ use error_stack::{report, ResultExt}; use masking::Secret; use router_env::{instrument, tracing}; #[cfg(feature = "accounts_cache")] -use storage_impl::redis::cache::{CacheKind, ACCOUNTS_CACHE}; +use storage_impl::redis::cache::{self, CacheKind, ACCOUNTS_CACHE}; use crate::{ connection, @@ -92,7 +92,7 @@ impl MerchantKeyStoreInterface for Store { #[cfg(feature = "accounts_cache")] { let key_store_cache_key = format!("merchant_key_store_{}", merchant_id); - super::cache::get_or_populate_in_memory( + cache::get_or_populate_in_memory( self, &key_store_cache_key, fetch_func, @@ -128,7 +128,7 @@ impl MerchantKeyStoreInterface for Store { #[cfg(feature = "accounts_cache")] { let key_store_cache_key = format!("merchant_key_store_{}", merchant_id); - super::cache::publish_and_redact( + cache::publish_and_redact( self, CacheKind::Accounts(key_store_cache_key.into()), delete_func, diff --git a/crates/router/src/services.rs b/crates/router/src/services.rs index 6d9d657e10..bb50e27b8c 100644 --- a/crates/router/src/services.rs +++ b/crates/router/src/services.rs @@ -22,7 +22,7 @@ use storage_impl::RouterStore; use tokio::sync::oneshot; pub use self::{api::*, encryption::*}; -use crate::{configs::Settings, consts, core::errors}; +use crate::{configs::Settings, core::errors}; #[cfg(not(feature = "olap"))] pub type StoreType = storage_impl::database::store::Store; @@ -68,7 +68,7 @@ pub async fn get_store( &config.redis, master_enc_key, shut_down_signal, - consts::PUB_SUB_CHANNEL, + storage_impl::redis::cache::PUB_SUB_CHANNEL, ) .await? }; diff --git a/crates/storage_impl/src/lib.rs b/crates/storage_impl/src/lib.rs index 0963cd2b41..4ee66826b2 100644 --- a/crates/storage_impl/src/lib.rs +++ b/crates/storage_impl/src/lib.rs @@ -4,7 +4,7 @@ use diesel_models as store; use error_stack::ResultExt; use hyperswitch_domain_models::errors::{StorageError, StorageResult}; use masking::StrongSecret; -use redis::{kv_store::RedisConnInterface, RedisStore}; +use redis::{kv_store::RedisConnInterface, pub_sub::PubSubInterface, RedisStore}; mod address; pub mod config; pub mod connection; @@ -105,7 +105,8 @@ impl RouterStore { .attach_printable("Failed to create cache store")?; cache_store.set_error_callback(cache_error_signal); cache_store - .subscribe_to_channel(inmemory_cache_stream) + .redis_conn + .subscribe(inmemory_cache_stream) .await .change_context(StorageError::InitializationError) .attach_printable("Failed to subscribe to inmemory cache stream")?; diff --git a/crates/storage_impl/src/redis.rs b/crates/storage_impl/src/redis.rs index d431b45829..acafdb94ce 100644 --- a/crates/storage_impl/src/redis.rs +++ b/crates/storage_impl/src/redis.rs @@ -4,9 +4,7 @@ pub mod pub_sub; use std::sync::{atomic, Arc}; -use error_stack::ResultExt; -use redis_interface::PubsubInterface; -use router_env::{logger, tracing::Instrument}; +use router_env::tracing::Instrument; use self::{kv_store::RedisConnInterface, pub_sub::PubSubInterface}; @@ -42,30 +40,6 @@ impl RedisStore { .in_current_span(), ); } - - pub async fn subscribe_to_channel( - &self, - channel: &str, - ) -> error_stack::Result<(), redis_interface::errors::RedisError> { - self.redis_conn.subscriber.manage_subscriptions(); - - self.redis_conn - .subscriber - .subscribe::<(), _>(channel) - .await - .change_context(redis_interface::errors::RedisError::SubscribeError)?; - - let redis_clone = self.redis_conn.clone(); - let _task_handle = tokio::spawn( - async move { - if let Err(e) = redis_clone.on_message().await { - logger::error!(pubsub_err=?e); - } - } - .in_current_span(), - ); - Ok(()) - } } impl RedisConnInterface for RedisStore { diff --git a/crates/storage_impl/src/redis/cache.rs b/crates/storage_impl/src/redis/cache.rs index edcbed21ad..68f04936da 100644 --- a/crates/storage_impl/src/redis/cache.rs +++ b/crates/storage_impl/src/redis/cache.rs @@ -6,14 +6,18 @@ use common_utils::{ }; use dyn_clone::DynClone; use error_stack::{Report, ResultExt}; -use hyperswitch_domain_models::errors::StorageError; use moka::future::Cache as MokaCache; use once_cell::sync::Lazy; use redis_interface::{errors::RedisError, RedisValue}; +use router_env::tracing::{self, instrument}; -use super::{kv_store::RedisConnInterface, pub_sub::PubSubInterface}; +use crate::{ + errors::StorageError, + redis::{PubSubInterface, RedisConnInterface}, +}; -pub(crate) const PUB_SUB_CHANNEL: &str = "hyperswitch_invalidate"; +/// Redis channel name used for publishing invalidation messages +pub const PUB_SUB_CHANNEL: &str = "hyperswitch_invalidate"; /// Prefix for config cache key const CONFIG_CACHE_PREFIX: &str = "config"; @@ -24,7 +28,7 @@ const ACCOUNTS_CACHE_PREFIX: &str = "accounts"; /// Prefix for routing cache key const ROUTING_CACHE_PREFIX: &str = "routing"; -/// Prefix for kgraph cache key +/// Prefix for cgraph cache key const CGRAPH_CACHE_PREFIX: &str = "cgraph"; /// Prefix for PM Filter cgraph cache key @@ -165,6 +169,7 @@ impl Cache { } } +#[instrument(skip_all)] pub async fn get_or_populate_redis( store: &(dyn RedisConnInterface + Send + Sync), key: impl AsRef, @@ -179,10 +184,9 @@ where let key = key.as_ref(); let redis = &store .get_redis_conn() - .map_err(|er| { - let error = format!("{}", er); - er.change_context(StorageError::RedisError(error)) - }) + .change_context(StorageError::RedisError( + RedisError::RedisConnectionError.into(), + )) .attach_printable("Failed to get redis connection")?; let redis_val = redis.get_and_deserialize_key::(key, type_name).await; let get_data_set_redis = || async { @@ -206,6 +210,7 @@ where } } +#[instrument(skip_all)] pub async fn get_or_populate_in_memory( store: &(dyn RedisConnInterface + Send + Sync), key: &str, @@ -227,8 +232,9 @@ where } } +#[instrument(skip_all)] pub async fn redact_cache( - store: &dyn RedisConnInterface, + store: &(dyn RedisConnInterface + Send + Sync), key: &str, fun: F, in_memory: Option<&Cache>, @@ -242,10 +248,9 @@ where let redis_conn = store .get_redis_conn() - .map_err(|er| { - let error = format!("{}", er); - er.change_context(StorageError::RedisError(error)) - }) + .change_context(StorageError::RedisError( + RedisError::RedisConnectionError.into(), + )) .attach_printable("Failed to get redis connection")?; redis_conn @@ -255,26 +260,35 @@ where Ok(data) } -pub async fn publish_into_redact_channel<'a>( - store: &dyn RedisConnInterface, - key: CacheKind<'a>, +#[instrument(skip_all)] +pub async fn publish_into_redact_channel<'a, K: IntoIterator> + Send>( + store: &(dyn RedisConnInterface + Send + Sync), + keys: K, ) -> CustomResult { let redis_conn = store .get_redis_conn() - .map_err(|er| { - let error = format!("{}", er); - er.change_context(StorageError::RedisError(error)) - }) + .change_context(StorageError::RedisError( + RedisError::RedisConnectionError.into(), + )) .attach_printable("Failed to get redis connection")?; - redis_conn - .publish(PUB_SUB_CHANNEL, key) - .await - .change_context(StorageError::KVError) + let futures = keys.into_iter().map(|key| async { + redis_conn + .clone() + .publish(PUB_SUB_CHANNEL, key) + .await + .change_context(StorageError::KVError) + }); + + Ok(futures::future::try_join_all(futures) + .await? + .iter() + .sum::()) } +#[instrument(skip_all)] pub async fn publish_and_redact<'a, T, F, Fut>( - store: &dyn RedisConnInterface, + store: &(dyn RedisConnInterface + Send + Sync), key: CacheKind<'a>, fun: F, ) -> CustomResult @@ -283,7 +297,23 @@ where Fut: futures::Future> + Send, { let data = fun().await?; - publish_into_redact_channel(store, key).await?; + publish_into_redact_channel(store, [key]).await?; + Ok(data) +} + +#[instrument(skip_all)] +pub async fn publish_and_redact_multiple<'a, T, F, Fut, K>( + store: &(dyn RedisConnInterface + Send + Sync), + keys: K, + fun: F, +) -> CustomResult +where + F: FnOnce() -> Fut + Send, + Fut: futures::Future> + Send, + K: IntoIterator> + Send, +{ + let data = fun().await?; + publish_into_redact_channel(store, keys).await?; Ok(data) } diff --git a/crates/storage_impl/src/redis/pub_sub.rs b/crates/storage_impl/src/redis/pub_sub.rs index e4ebaf471d..7b23cb25ad 100644 --- a/crates/storage_impl/src/redis/pub_sub.rs +++ b/crates/storage_impl/src/redis/pub_sub.rs @@ -1,6 +1,6 @@ use error_stack::ResultExt; use redis_interface::{errors as redis_errors, PubsubInterface, RedisValue}; -use router_env::logger; +use router_env::{logger, tracing::Instrument}; use crate::redis::cache::{ CacheKind, ACCOUNTS_CACHE, CGRAPH_CACHE, CONFIG_CACHE, PM_FILTERS_CGRAPH_CACHE, ROUTING_CACHE, @@ -20,7 +20,7 @@ pub trait PubSubInterface { } #[async_trait::async_trait] -impl PubSubInterface for redis_interface::RedisConnectionPool { +impl PubSubInterface for std::sync::Arc { #[inline] async fn subscribe(&self, channel: &str) -> error_stack::Result<(), redis_errors::RedisError> { // Spawns a task that will automatically re-subscribe to any channels or channel patterns used by the client. @@ -29,7 +29,18 @@ impl PubSubInterface for redis_interface::RedisConnectionPool { self.subscriber .subscribe(channel) .await - .change_context(redis_errors::RedisError::SubscribeError) + .change_context(redis_errors::RedisError::SubscribeError)?; + + let redis_clone = self.clone(); + let _task_handle = tokio::spawn( + async move { + if let Err(pubsub_error) = redis_clone.on_message().await { + logger::error!(?pubsub_error); + } + } + .in_current_span(), + ); + Ok(()) } #[inline]