mirror of
https://github.com/juspay/hyperswitch.git
synced 2025-10-29 00:49:42 +08:00
refactor(conditional_configs): refactor conditional_configs to use Moka Cache instead of Static Cache (#4814)
Co-authored-by: hyperswitch-bot[bot] <148525504+hyperswitch-bot[bot]@users.noreply.github.com>
This commit is contained in:
@ -1,5 +1,9 @@
|
|||||||
pub mod types;
|
pub mod types;
|
||||||
|
|
||||||
|
use std::fmt::Debug;
|
||||||
|
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
backend::{self, inputs, EuclidBackend},
|
backend::{self, inputs, EuclidBackend},
|
||||||
frontend::{
|
frontend::{
|
||||||
@ -9,6 +13,7 @@ use crate::{
|
|||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize, Debug, Clone)]
|
||||||
pub struct VirInterpreterBackend<O> {
|
pub struct VirInterpreterBackend<O> {
|
||||||
program: vir::ValuedProgram<O>,
|
program: vir::ValuedProgram<O>,
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,13 +1,15 @@
|
|||||||
//! Valued Intermediate Representation
|
//! Valued Intermediate Representation
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
use crate::types::{EuclidValue, Metadata};
|
use crate::types::{EuclidValue, Metadata};
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
pub enum ValuedComparisonLogic {
|
pub enum ValuedComparisonLogic {
|
||||||
NegativeConjunction,
|
NegativeConjunction,
|
||||||
PositiveDisjunction,
|
PositiveDisjunction,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||||
pub struct ValuedComparison {
|
pub struct ValuedComparison {
|
||||||
pub values: Vec<EuclidValue>,
|
pub values: Vec<EuclidValue>,
|
||||||
pub logic: ValuedComparisonLogic,
|
pub logic: ValuedComparisonLogic,
|
||||||
@ -16,20 +18,20 @@ pub struct ValuedComparison {
|
|||||||
|
|
||||||
pub type ValuedIfCondition = Vec<ValuedComparison>;
|
pub type ValuedIfCondition = Vec<ValuedComparison>;
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||||
pub struct ValuedIfStatement {
|
pub struct ValuedIfStatement {
|
||||||
pub condition: ValuedIfCondition,
|
pub condition: ValuedIfCondition,
|
||||||
pub nested: Option<Vec<ValuedIfStatement>>,
|
pub nested: Option<Vec<ValuedIfStatement>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||||
pub struct ValuedRule<O> {
|
pub struct ValuedRule<O> {
|
||||||
pub name: String,
|
pub name: String,
|
||||||
pub connector_selection: O,
|
pub connector_selection: O,
|
||||||
pub statements: Vec<ValuedIfStatement>,
|
pub statements: Vec<ValuedIfStatement>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||||
pub struct ValuedProgram<O> {
|
pub struct ValuedProgram<O> {
|
||||||
pub default_selection: O,
|
pub default_selection: O,
|
||||||
pub rules: Vec<ValuedRule<O>>,
|
pub rules: Vec<ValuedRule<O>>,
|
||||||
|
|||||||
@ -1,7 +1,7 @@
|
|||||||
pub mod transformers;
|
pub mod transformers;
|
||||||
|
|
||||||
use euclid_macros::EnumNums;
|
use euclid_macros::EnumNums;
|
||||||
use serde::Serialize;
|
use serde::{Deserialize, Serialize};
|
||||||
use strum::VariantNames;
|
use strum::VariantNames;
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
@ -143,7 +143,7 @@ impl EuclidKey {
|
|||||||
|
|
||||||
enums::collect_variants!(EuclidKey);
|
enums::collect_variants!(EuclidKey);
|
||||||
|
|
||||||
#[derive(Debug, Clone, PartialEq, Eq, Hash, serde::Serialize)]
|
#[derive(Debug, Clone, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
|
||||||
#[serde(rename_all = "snake_case")]
|
#[serde(rename_all = "snake_case")]
|
||||||
pub enum NumValueRefinement {
|
pub enum NumValueRefinement {
|
||||||
NotEqual,
|
NotEqual,
|
||||||
@ -178,18 +178,18 @@ impl From<NumValueRefinement> for ast::ComparisonType {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Default, Clone, PartialEq, Eq, Hash, serde::Serialize)]
|
#[derive(Debug, Default, Clone, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
|
||||||
pub struct StrValue {
|
pub struct StrValue {
|
||||||
pub value: String,
|
pub value: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Default, Clone, PartialEq, Eq, Hash, serde::Serialize)]
|
#[derive(Debug, Default, Clone, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
|
||||||
pub struct MetadataValue {
|
pub struct MetadataValue {
|
||||||
pub key: String,
|
pub key: String,
|
||||||
pub value: String,
|
pub value: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Default, Clone, PartialEq, Eq, Hash, serde::Serialize)]
|
#[derive(Debug, Default, Clone, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
|
||||||
pub struct NumValue {
|
pub struct NumValue {
|
||||||
pub number: i64,
|
pub number: i64,
|
||||||
pub refinement: Option<NumValueRefinement>,
|
pub refinement: Option<NumValueRefinement>,
|
||||||
@ -234,7 +234,7 @@ impl NumValue {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
|
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
|
||||||
pub enum EuclidValue {
|
pub enum EuclidValue {
|
||||||
PaymentMethod(enums::PaymentMethod),
|
PaymentMethod(enums::PaymentMethod),
|
||||||
CardBin(StrValue),
|
CardBin(StrValue),
|
||||||
|
|||||||
@ -6,6 +6,7 @@ use common_utils::ext_traits::{Encode, StringExt, ValueExt};
|
|||||||
use diesel_models::configs;
|
use diesel_models::configs;
|
||||||
use error_stack::ResultExt;
|
use error_stack::ResultExt;
|
||||||
use euclid::frontend::ast;
|
use euclid::frontend::ast;
|
||||||
|
use storage_impl::redis::cache;
|
||||||
|
|
||||||
use super::routing::helpers::{
|
use super::routing::helpers::{
|
||||||
get_payment_config_routing_id, update_merchant_active_algorithm_ref,
|
get_payment_config_routing_id, update_merchant_active_algorithm_ref,
|
||||||
@ -99,8 +100,9 @@ pub async fn upsert_conditional_config(
|
|||||||
.change_context(errors::ApiErrorResponse::InternalServerError)
|
.change_context(errors::ApiErrorResponse::InternalServerError)
|
||||||
.attach_printable("Error serializing the config")?;
|
.attach_printable("Error serializing the config")?;
|
||||||
|
|
||||||
algo_id.update_conditional_config_id(key);
|
algo_id.update_conditional_config_id(key.clone());
|
||||||
update_merchant_active_algorithm_ref(db, &key_store, algo_id)
|
let config_key = cache::CacheKind::DecisionManager(key.into());
|
||||||
|
update_merchant_active_algorithm_ref(db, &key_store, config_key, algo_id)
|
||||||
.await
|
.await
|
||||||
.change_context(errors::ApiErrorResponse::InternalServerError)
|
.change_context(errors::ApiErrorResponse::InternalServerError)
|
||||||
.attach_printable("Failed to update routing algorithm ref")?;
|
.attach_printable("Failed to update routing algorithm ref")?;
|
||||||
@ -134,8 +136,9 @@ pub async fn upsert_conditional_config(
|
|||||||
.change_context(errors::ApiErrorResponse::InternalServerError)
|
.change_context(errors::ApiErrorResponse::InternalServerError)
|
||||||
.attach_printable("Error fetching the config")?;
|
.attach_printable("Error fetching the config")?;
|
||||||
|
|
||||||
algo_id.update_conditional_config_id(key);
|
algo_id.update_conditional_config_id(key.clone());
|
||||||
update_merchant_active_algorithm_ref(db, &key_store, algo_id)
|
let config_key = cache::CacheKind::DecisionManager(key.into());
|
||||||
|
update_merchant_active_algorithm_ref(db, &key_store, config_key, algo_id)
|
||||||
.await
|
.await
|
||||||
.change_context(errors::ApiErrorResponse::InternalServerError)
|
.change_context(errors::ApiErrorResponse::InternalServerError)
|
||||||
.attach_printable("Failed to update routing algorithm ref")?;
|
.attach_printable("Failed to update routing algorithm ref")?;
|
||||||
@ -164,7 +167,8 @@ pub async fn delete_conditional_config(
|
|||||||
.attach_printable("Could not decode the conditional_config algorithm")?
|
.attach_printable("Could not decode the conditional_config algorithm")?
|
||||||
.unwrap_or_default();
|
.unwrap_or_default();
|
||||||
algo_id.config_algo_id = None;
|
algo_id.config_algo_id = None;
|
||||||
update_merchant_active_algorithm_ref(db, &key_store, algo_id)
|
let config_key = cache::CacheKind::DecisionManager(key.clone().into());
|
||||||
|
update_merchant_active_algorithm_ref(db, &key_store, config_key, algo_id)
|
||||||
.await
|
.await
|
||||||
.change_context(errors::ApiErrorResponse::InternalServerError)
|
.change_context(errors::ApiErrorResponse::InternalServerError)
|
||||||
.attach_printable("Failed to update deleted algorithm ref")?;
|
.attach_printable("Failed to update deleted algorithm ref")?;
|
||||||
|
|||||||
@ -1,21 +1,21 @@
|
|||||||
use std::sync::Arc;
|
|
||||||
|
|
||||||
use api_models::{
|
use api_models::{
|
||||||
payment_methods::SurchargeDetailsResponse,
|
payment_methods::SurchargeDetailsResponse,
|
||||||
payments, routing,
|
payments, routing,
|
||||||
surcharge_decision_configs::{self, SurchargeDecisionConfigs, SurchargeDecisionManagerRecord},
|
surcharge_decision_configs::{self, SurchargeDecisionConfigs, SurchargeDecisionManagerRecord},
|
||||||
};
|
};
|
||||||
use common_utils::{ext_traits::StringExt, static_cache::StaticCache, types as common_utils_types};
|
use common_utils::{ext_traits::StringExt, types as common_utils_types};
|
||||||
use error_stack::{self, ResultExt};
|
use error_stack::{self, ResultExt};
|
||||||
use euclid::{
|
use euclid::{
|
||||||
backend,
|
backend,
|
||||||
backend::{inputs as dsl_inputs, EuclidBackend},
|
backend::{inputs as dsl_inputs, EuclidBackend},
|
||||||
};
|
};
|
||||||
use router_env::{instrument, tracing};
|
use router_env::{instrument, tracing};
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use storage_impl::redis::cache::{self, SURCHARGE_CACHE};
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
core::{
|
core::{
|
||||||
errors::ConditionalConfigError as ConfigError,
|
errors::{self, ConditionalConfigError as ConfigError},
|
||||||
payments::{
|
payments::{
|
||||||
conditional_configs::ConditionalConfigResult, routing::make_dsl_input_for_surcharge,
|
conditional_configs::ConditionalConfigResult, routing::make_dsl_input_for_surcharge,
|
||||||
types, PaymentData,
|
types, PaymentData,
|
||||||
@ -29,9 +29,8 @@ use crate::{
|
|||||||
SessionState,
|
SessionState,
|
||||||
};
|
};
|
||||||
|
|
||||||
static CONF_CACHE: StaticCache<VirInterpreterBackendCacheWrapper> = StaticCache::new();
|
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||||
|
pub struct VirInterpreterBackendCacheWrapper {
|
||||||
struct VirInterpreterBackendCacheWrapper {
|
|
||||||
cached_algorithm: backend::VirInterpreterBackend<SurchargeDecisionConfigs>,
|
cached_algorithm: backend::VirInterpreterBackend<SurchargeDecisionConfigs>,
|
||||||
merchant_surcharge_configs: surcharge_decision_configs::MerchantSurchargeConfigs,
|
merchant_surcharge_configs: surcharge_decision_configs::MerchantSurchargeConfigs,
|
||||||
}
|
}
|
||||||
@ -53,7 +52,7 @@ impl TryFrom<SurchargeDecisionManagerRecord> for VirInterpreterBackendCacheWrapp
|
|||||||
|
|
||||||
enum SurchargeSource {
|
enum SurchargeSource {
|
||||||
/// Surcharge will be generated through the surcharge rules
|
/// Surcharge will be generated through the surcharge rules
|
||||||
Generate(Arc<VirInterpreterBackendCacheWrapper>),
|
Generate(VirInterpreterBackendCacheWrapper),
|
||||||
/// Surcharge is predefined by the merchant through payment create request
|
/// Surcharge is predefined by the merchant through payment create request
|
||||||
Predetermined(payments::RequestSurchargeDetails),
|
Predetermined(payments::RequestSurchargeDetails),
|
||||||
}
|
}
|
||||||
@ -116,19 +115,13 @@ pub async fn perform_surcharge_decision_management_for_payment_method_list(
|
|||||||
surcharge_decision_configs::MerchantSurchargeConfigs::default(),
|
surcharge_decision_configs::MerchantSurchargeConfigs::default(),
|
||||||
),
|
),
|
||||||
(None, Some(algorithm_id)) => {
|
(None, Some(algorithm_id)) => {
|
||||||
let key = ensure_algorithm_cached(
|
let cached_algo = ensure_algorithm_cached(
|
||||||
&*state.store,
|
&*state.store,
|
||||||
&payment_attempt.merchant_id,
|
&payment_attempt.merchant_id,
|
||||||
algorithm_ref.timestamp,
|
|
||||||
algorithm_id.as_str(),
|
algorithm_id.as_str(),
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
let cached_algo = CONF_CACHE
|
|
||||||
.retrieve(&key)
|
|
||||||
.change_context(ConfigError::CacheMiss)
|
|
||||||
.attach_printable(
|
|
||||||
"Unable to retrieve cached routing algorithm even after refresh",
|
|
||||||
)?;
|
|
||||||
let merchant_surcharge_config = cached_algo.merchant_surcharge_configs.clone();
|
let merchant_surcharge_config = cached_algo.merchant_surcharge_configs.clone();
|
||||||
(
|
(
|
||||||
SurchargeSource::Generate(cached_algo),
|
SurchargeSource::Generate(cached_algo),
|
||||||
@ -233,19 +226,13 @@ where
|
|||||||
SurchargeSource::Predetermined(request_surcharge_details)
|
SurchargeSource::Predetermined(request_surcharge_details)
|
||||||
}
|
}
|
||||||
(None, Some(algorithm_id)) => {
|
(None, Some(algorithm_id)) => {
|
||||||
let key = ensure_algorithm_cached(
|
let cached_algo = ensure_algorithm_cached(
|
||||||
&*state.store,
|
&*state.store,
|
||||||
&payment_data.payment_attempt.merchant_id,
|
&payment_data.payment_attempt.merchant_id,
|
||||||
algorithm_ref.timestamp,
|
|
||||||
algorithm_id.as_str(),
|
algorithm_id.as_str(),
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
let cached_algo = CONF_CACHE
|
|
||||||
.retrieve(&key)
|
|
||||||
.change_context(ConfigError::CacheMiss)
|
|
||||||
.attach_printable(
|
|
||||||
"Unable to retrieve cached routing algorithm even after refresh",
|
|
||||||
)?;
|
|
||||||
SurchargeSource::Generate(cached_algo)
|
SurchargeSource::Generate(cached_algo)
|
||||||
}
|
}
|
||||||
(None, None) => return Ok(surcharge_metadata),
|
(None, None) => return Ok(surcharge_metadata),
|
||||||
@ -291,19 +278,13 @@ pub async fn perform_surcharge_decision_management_for_saved_cards(
|
|||||||
SurchargeSource::Predetermined(request_surcharge_details)
|
SurchargeSource::Predetermined(request_surcharge_details)
|
||||||
}
|
}
|
||||||
(None, Some(algorithm_id)) => {
|
(None, Some(algorithm_id)) => {
|
||||||
let key = ensure_algorithm_cached(
|
let cached_algo = ensure_algorithm_cached(
|
||||||
&*state.store,
|
&*state.store,
|
||||||
&payment_attempt.merchant_id,
|
&payment_attempt.merchant_id,
|
||||||
algorithm_ref.timestamp,
|
|
||||||
algorithm_id.as_str(),
|
algorithm_id.as_str(),
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
let cached_algo = CONF_CACHE
|
|
||||||
.retrieve(&key)
|
|
||||||
.change_context(ConfigError::CacheMiss)
|
|
||||||
.attach_printable(
|
|
||||||
"Unable to retrieve cached routing algorithm even after refresh",
|
|
||||||
)?;
|
|
||||||
SurchargeSource::Generate(cached_algo)
|
SurchargeSource::Generate(cached_algo)
|
||||||
}
|
}
|
||||||
(None, None) => return Ok(surcharge_metadata),
|
(None, None) => return Ok(surcharge_metadata),
|
||||||
@ -388,48 +369,31 @@ fn get_surcharge_details_from_surcharge_output(
|
|||||||
pub async fn ensure_algorithm_cached(
|
pub async fn ensure_algorithm_cached(
|
||||||
store: &dyn StorageInterface,
|
store: &dyn StorageInterface,
|
||||||
merchant_id: &str,
|
merchant_id: &str,
|
||||||
timestamp: i64,
|
|
||||||
algorithm_id: &str,
|
algorithm_id: &str,
|
||||||
) -> ConditionalConfigResult<String> {
|
) -> ConditionalConfigResult<VirInterpreterBackendCacheWrapper> {
|
||||||
let key = format!("surcharge_dsl_{merchant_id}");
|
let key = format!("surcharge_dsl_{merchant_id}");
|
||||||
let present = CONF_CACHE
|
|
||||||
.present(&key)
|
|
||||||
.change_context(ConfigError::DslCachePoisoned)
|
|
||||||
.attach_printable("Error checking presence of DSL")?;
|
|
||||||
let expired = CONF_CACHE
|
|
||||||
.expired(&key, timestamp)
|
|
||||||
.change_context(ConfigError::DslCachePoisoned)
|
|
||||||
.attach_printable("Error checking presence of DSL")?;
|
|
||||||
|
|
||||||
if !present || expired {
|
let value_to_cache = || async {
|
||||||
refresh_surcharge_algorithm_cache(store, key.clone(), algorithm_id, timestamp).await?
|
let config: diesel_models::Config = store.find_config_by_key(algorithm_id).await?;
|
||||||
}
|
|
||||||
Ok(key)
|
|
||||||
}
|
|
||||||
|
|
||||||
#[instrument(skip_all)]
|
|
||||||
pub async fn refresh_surcharge_algorithm_cache(
|
|
||||||
store: &dyn StorageInterface,
|
|
||||||
key: String,
|
|
||||||
algorithm_id: &str,
|
|
||||||
timestamp: i64,
|
|
||||||
) -> ConditionalConfigResult<()> {
|
|
||||||
let config = store
|
|
||||||
.find_config_by_key(algorithm_id)
|
|
||||||
.await
|
|
||||||
.change_context(ConfigError::DslMissingInDb)
|
|
||||||
.attach_printable("Error parsing DSL from config")?;
|
|
||||||
let record: SurchargeDecisionManagerRecord = config
|
let record: SurchargeDecisionManagerRecord = config
|
||||||
.config
|
.config
|
||||||
.parse_struct("Program")
|
.parse_struct("Program")
|
||||||
.change_context(ConfigError::DslParsingError)
|
.change_context(errors::StorageError::DeserializationFailed)
|
||||||
.attach_printable("Error parsing routing algorithm from configs")?;
|
.attach_printable("Error parsing routing algorithm from configs")?;
|
||||||
let value_to_cache = VirInterpreterBackendCacheWrapper::try_from(record)?;
|
VirInterpreterBackendCacheWrapper::try_from(record)
|
||||||
CONF_CACHE
|
.change_context(errors::StorageError::ValueNotFound("Program".to_string()))
|
||||||
.save(key, value_to_cache, timestamp)
|
.attach_printable("Error initializing DSL interpreter backend")
|
||||||
.change_context(ConfigError::DslCachePoisoned)
|
};
|
||||||
.attach_printable("Error saving DSL to cache")?;
|
let interpreter = cache::get_or_populate_in_memory(
|
||||||
Ok(())
|
store.get_cache_store().as_ref(),
|
||||||
|
&key,
|
||||||
|
value_to_cache,
|
||||||
|
&SURCHARGE_CACHE,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.change_context(ConfigError::CacheMiss)
|
||||||
|
.attach_printable("Unable to retrieve cached routing algorithm even after refresh")?;
|
||||||
|
Ok(interpreter)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn execute_dsl_and_get_conditional_config(
|
pub fn execute_dsl_and_get_conditional_config(
|
||||||
|
|||||||
@ -4,19 +4,17 @@ use api_models::{
|
|||||||
conditional_configs::{ConditionalConfigs, DecisionManagerRecord},
|
conditional_configs::{ConditionalConfigs, DecisionManagerRecord},
|
||||||
routing,
|
routing,
|
||||||
};
|
};
|
||||||
use common_utils::{ext_traits::StringExt, static_cache::StaticCache};
|
use common_utils::ext_traits::StringExt;
|
||||||
use error_stack::ResultExt;
|
use error_stack::ResultExt;
|
||||||
use euclid::backend::{self, inputs as dsl_inputs, EuclidBackend};
|
use euclid::backend::{self, inputs as dsl_inputs, EuclidBackend};
|
||||||
use router_env::{instrument, tracing};
|
use router_env::{instrument, tracing};
|
||||||
|
use storage_impl::redis::cache::{self, DECISION_MANAGER_CACHE};
|
||||||
|
|
||||||
use super::routing::make_dsl_input;
|
use super::routing::make_dsl_input;
|
||||||
use crate::{
|
use crate::{
|
||||||
core::{errors, errors::ConditionalConfigError as ConfigError, payments},
|
core::{errors, errors::ConditionalConfigError as ConfigError, payments},
|
||||||
routes,
|
routes,
|
||||||
};
|
};
|
||||||
|
|
||||||
static CONF_CACHE: StaticCache<backend::VirInterpreterBackend<ConditionalConfigs>> =
|
|
||||||
StaticCache::new();
|
|
||||||
pub type ConditionalConfigResult<O> = errors::CustomResult<O, ConfigError>;
|
pub type ConditionalConfigResult<O> = errors::CustomResult<O, ConfigError>;
|
||||||
|
|
||||||
#[instrument(skip_all)]
|
#[instrument(skip_all)]
|
||||||
@ -31,76 +29,40 @@ pub async fn perform_decision_management<F: Clone>(
|
|||||||
} else {
|
} else {
|
||||||
return Ok(ConditionalConfigs::default());
|
return Ok(ConditionalConfigs::default());
|
||||||
};
|
};
|
||||||
|
let db = &*state.store;
|
||||||
|
|
||||||
let key = ensure_algorithm_cached(
|
|
||||||
state,
|
|
||||||
merchant_id,
|
|
||||||
algorithm_ref.timestamp,
|
|
||||||
algorithm_id.as_str(),
|
|
||||||
)
|
|
||||||
.await?;
|
|
||||||
let cached_algo = CONF_CACHE
|
|
||||||
.retrieve(&key)
|
|
||||||
.change_context(ConfigError::CacheMiss)
|
|
||||||
.attach_printable("Unable to retrieve cached routing algorithm even after refresh")?;
|
|
||||||
let backend_input =
|
|
||||||
make_dsl_input(payment_data).change_context(ConfigError::InputConstructionError)?;
|
|
||||||
let interpreter = cached_algo.as_ref();
|
|
||||||
execute_dsl_and_get_conditional_config(backend_input, interpreter).await
|
|
||||||
}
|
|
||||||
|
|
||||||
#[instrument(skip_all)]
|
|
||||||
pub async fn ensure_algorithm_cached(
|
|
||||||
state: &routes::SessionState,
|
|
||||||
merchant_id: &str,
|
|
||||||
timestamp: i64,
|
|
||||||
algorithm_id: &str,
|
|
||||||
) -> ConditionalConfigResult<String> {
|
|
||||||
let key = format!("dsl_{merchant_id}");
|
let key = format!("dsl_{merchant_id}");
|
||||||
let present = CONF_CACHE
|
|
||||||
.present(&key)
|
|
||||||
.change_context(ConfigError::DslCachePoisoned)
|
|
||||||
.attach_printable("Error checking presece of DSL")?;
|
|
||||||
let expired = CONF_CACHE
|
|
||||||
.expired(&key, timestamp)
|
|
||||||
.change_context(ConfigError::DslCachePoisoned)
|
|
||||||
.attach_printable("Error checking presence of DSL")?;
|
|
||||||
if !present || expired {
|
|
||||||
refresh_routing_cache(state, key.clone(), algorithm_id, timestamp).await?;
|
|
||||||
};
|
|
||||||
Ok(key)
|
|
||||||
}
|
|
||||||
|
|
||||||
#[instrument(skip_all)]
|
let find_key_from_db = || async {
|
||||||
pub async fn refresh_routing_cache(
|
let config = db.find_config_by_key(&algorithm_id).await?;
|
||||||
state: &routes::SessionState,
|
|
||||||
key: String,
|
|
||||||
algorithm_id: &str,
|
|
||||||
timestamp: i64,
|
|
||||||
) -> ConditionalConfigResult<()> {
|
|
||||||
let config = state
|
|
||||||
.store
|
|
||||||
.find_config_by_key(algorithm_id)
|
|
||||||
.await
|
|
||||||
.change_context(ConfigError::DslMissingInDb)
|
|
||||||
.attach_printable("Error parsing DSL from config")?;
|
|
||||||
let rec: DecisionManagerRecord = config
|
let rec: DecisionManagerRecord = config
|
||||||
.config
|
.config
|
||||||
.parse_struct("Program")
|
.parse_struct("Program")
|
||||||
.change_context(ConfigError::DslParsingError)
|
.change_context(errors::StorageError::DeserializationFailed)
|
||||||
.attach_printable("Error parsing routing algorithm from configs")?;
|
.attach_printable("Error parsing routing algorithm from configs")?;
|
||||||
let interpreter: backend::VirInterpreterBackend<ConditionalConfigs> =
|
|
||||||
backend::VirInterpreterBackend::with_program(rec.program)
|
backend::VirInterpreterBackend::with_program(rec.program)
|
||||||
.change_context(ConfigError::DslBackendInitError)
|
.change_context(errors::StorageError::ValueNotFound("Program".to_string()))
|
||||||
.attach_printable("Error initializing DSL interpreter backend")?;
|
.attach_printable("Error initializing DSL interpreter backend")
|
||||||
CONF_CACHE
|
};
|
||||||
.save(key, interpreter, timestamp)
|
|
||||||
.change_context(ConfigError::DslCachePoisoned)
|
let interpreter = cache::get_or_populate_in_memory(
|
||||||
.attach_printable("Error saving DSL to cache")?;
|
db.get_cache_store().as_ref(),
|
||||||
Ok(())
|
&key,
|
||||||
|
find_key_from_db,
|
||||||
|
&DECISION_MANAGER_CACHE,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.change_context(ConfigError::DslCachePoisoned)?;
|
||||||
|
|
||||||
|
let backend_input =
|
||||||
|
make_dsl_input(payment_data).change_context(ConfigError::InputConstructionError)?;
|
||||||
|
|
||||||
|
execute_dsl_and_get_conditional_config(backend_input, &interpreter)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn execute_dsl_and_get_conditional_config(
|
pub fn execute_dsl_and_get_conditional_config(
|
||||||
backend_input: dsl_inputs::BackendInput,
|
backend_input: dsl_inputs::BackendInput,
|
||||||
interpreter: &backend::VirInterpreterBackend<ConditionalConfigs>,
|
interpreter: &backend::VirInterpreterBackend<ConditionalConfigs>,
|
||||||
) -> ConditionalConfigResult<ConditionalConfigs> {
|
) -> ConditionalConfigResult<ConditionalConfigs> {
|
||||||
|
|||||||
@ -558,9 +558,9 @@ pub async fn get_merchant_cgraph<'a>(
|
|||||||
|
|
||||||
#[cfg(not(feature = "business_profile_routing"))]
|
#[cfg(not(feature = "business_profile_routing"))]
|
||||||
let key = match transaction_type {
|
let key = match transaction_type {
|
||||||
api_enums::TransactionType::Payment => format!("kgraph_{}", merchant_id),
|
api_enums::TransactionType::Payment => format!("cgraph_{}", merchant_id),
|
||||||
#[cfg(feature = "payouts")]
|
#[cfg(feature = "payouts")]
|
||||||
api_enums::TransactionType::Payout => format!("kgraph_po_{}", merchant_id),
|
api_enums::TransactionType::Payout => format!("cgraph_po_{}", merchant_id),
|
||||||
};
|
};
|
||||||
|
|
||||||
let cached_cgraph = CGRAPH_CACHE
|
let cached_cgraph = CGRAPH_CACHE
|
||||||
|
|||||||
@ -15,6 +15,8 @@ use diesel_models::configs;
|
|||||||
use diesel_models::routing_algorithm::RoutingAlgorithm;
|
use diesel_models::routing_algorithm::RoutingAlgorithm;
|
||||||
use error_stack::ResultExt;
|
use error_stack::ResultExt;
|
||||||
use rustc_hash::FxHashSet;
|
use rustc_hash::FxHashSet;
|
||||||
|
#[cfg(not(feature = "business_profile_routing"))]
|
||||||
|
use storage_impl::redis::cache;
|
||||||
|
|
||||||
use super::payments;
|
use super::payments;
|
||||||
#[cfg(feature = "payouts")]
|
#[cfg(feature = "payouts")]
|
||||||
@ -232,7 +234,11 @@ pub async fn create_routing_config(
|
|||||||
if records_are_empty {
|
if records_are_empty {
|
||||||
merchant_dictionary.active_id = Some(algorithm_id.clone());
|
merchant_dictionary.active_id = Some(algorithm_id.clone());
|
||||||
algorithm_ref.update_algorithm_id(algorithm_id);
|
algorithm_ref.update_algorithm_id(algorithm_id);
|
||||||
helpers::update_merchant_active_algorithm_ref(db, &key_store, algorithm_ref).await?;
|
let key =
|
||||||
|
cache::CacheKind::Routing(format!("dsl_{}", &merchant_account.merchant_id).into());
|
||||||
|
|
||||||
|
helpers::update_merchant_active_algorithm_ref(db, &key_store, key, algorithm_ref)
|
||||||
|
.await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
helpers::update_merchant_routing_dictionary(
|
helpers::update_merchant_routing_dictionary(
|
||||||
@ -363,7 +369,9 @@ pub async fn link_routing_config(
|
|||||||
merchant_dictionary,
|
merchant_dictionary,
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
helpers::update_merchant_active_algorithm_ref(db, &key_store, routing_ref).await?;
|
let key =
|
||||||
|
cache::CacheKind::Routing(format!("dsl_{}", &merchant_account.merchant_id).into());
|
||||||
|
helpers::update_merchant_active_algorithm_ref(db, &key_store, key, routing_ref).await?;
|
||||||
|
|
||||||
metrics::ROUTING_LINK_CONFIG_SUCCESS_RESPONSE.add(&metrics::CONTEXT, 1, &[]);
|
metrics::ROUTING_LINK_CONFIG_SUCCESS_RESPONSE.add(&metrics::CONTEXT, 1, &[]);
|
||||||
Ok(service_api::ApplicationResponse::Json(response))
|
Ok(service_api::ApplicationResponse::Json(response))
|
||||||
|
|||||||
@ -188,6 +188,7 @@ pub async fn update_routing_algorithm(
|
|||||||
pub async fn update_merchant_active_algorithm_ref(
|
pub async fn update_merchant_active_algorithm_ref(
|
||||||
db: &dyn StorageInterface,
|
db: &dyn StorageInterface,
|
||||||
key_store: &domain::MerchantKeyStore,
|
key_store: &domain::MerchantKeyStore,
|
||||||
|
config_key: cache::CacheKind<'_>,
|
||||||
algorithm_id: routing_types::RoutingAlgorithmRef,
|
algorithm_id: routing_types::RoutingAlgorithmRef,
|
||||||
) -> RouterResult<()> {
|
) -> RouterResult<()> {
|
||||||
let ref_value = algorithm_id
|
let ref_value = algorithm_id
|
||||||
@ -226,6 +227,11 @@ pub async fn update_merchant_active_algorithm_ref(
|
|||||||
.change_context(errors::ApiErrorResponse::InternalServerError)
|
.change_context(errors::ApiErrorResponse::InternalServerError)
|
||||||
.attach_printable("Failed to update routing algorithm ref in merchant account")?;
|
.attach_printable("Failed to update routing algorithm ref in merchant account")?;
|
||||||
|
|
||||||
|
cache::publish_into_redact_channel(db.get_cache_store().as_ref(), [config_key])
|
||||||
|
.await
|
||||||
|
.change_context(errors::ApiErrorResponse::InternalServerError)
|
||||||
|
.attach_printable("Failed to invalidate the config cache")?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -9,6 +9,7 @@ use common_utils::ext_traits::{Encode, StringExt, ValueExt};
|
|||||||
use diesel_models::configs;
|
use diesel_models::configs;
|
||||||
use error_stack::ResultExt;
|
use error_stack::ResultExt;
|
||||||
use euclid::frontend::ast;
|
use euclid::frontend::ast;
|
||||||
|
use storage_impl::redis::cache;
|
||||||
|
|
||||||
use super::routing::helpers::{
|
use super::routing::helpers::{
|
||||||
get_payment_method_surcharge_routing_id, update_merchant_active_algorithm_ref,
|
get_payment_method_surcharge_routing_id, update_merchant_active_algorithm_ref,
|
||||||
@ -88,8 +89,9 @@ pub async fn upsert_surcharge_decision_config(
|
|||||||
.change_context(errors::ApiErrorResponse::InternalServerError)
|
.change_context(errors::ApiErrorResponse::InternalServerError)
|
||||||
.attach_printable("Error serializing the config")?;
|
.attach_printable("Error serializing the config")?;
|
||||||
|
|
||||||
algo_id.update_surcharge_config_id(key);
|
algo_id.update_surcharge_config_id(key.clone());
|
||||||
update_merchant_active_algorithm_ref(db, &key_store, algo_id)
|
let config_key = cache::CacheKind::Surcharge(key.into());
|
||||||
|
update_merchant_active_algorithm_ref(db, &key_store, config_key, algo_id)
|
||||||
.await
|
.await
|
||||||
.change_context(errors::ApiErrorResponse::InternalServerError)
|
.change_context(errors::ApiErrorResponse::InternalServerError)
|
||||||
.attach_printable("Failed to update routing algorithm ref")?;
|
.attach_printable("Failed to update routing algorithm ref")?;
|
||||||
@ -124,8 +126,9 @@ pub async fn upsert_surcharge_decision_config(
|
|||||||
.change_context(errors::ApiErrorResponse::InternalServerError)
|
.change_context(errors::ApiErrorResponse::InternalServerError)
|
||||||
.attach_printable("Error fetching the config")?;
|
.attach_printable("Error fetching the config")?;
|
||||||
|
|
||||||
algo_id.update_surcharge_config_id(key);
|
algo_id.update_surcharge_config_id(key.clone());
|
||||||
update_merchant_active_algorithm_ref(db, &key_store, algo_id)
|
let config_key = cache::CacheKind::Surcharge(key.clone().into());
|
||||||
|
update_merchant_active_algorithm_ref(db, &key_store, config_key, algo_id)
|
||||||
.await
|
.await
|
||||||
.change_context(errors::ApiErrorResponse::InternalServerError)
|
.change_context(errors::ApiErrorResponse::InternalServerError)
|
||||||
.attach_printable("Failed to update routing algorithm ref")?;
|
.attach_printable("Failed to update routing algorithm ref")?;
|
||||||
@ -154,7 +157,8 @@ pub async fn delete_surcharge_decision_config(
|
|||||||
.attach_printable("Could not decode the surcharge conditional_config algorithm")?
|
.attach_printable("Could not decode the surcharge conditional_config algorithm")?
|
||||||
.unwrap_or_default();
|
.unwrap_or_default();
|
||||||
algo_id.surcharge_config_algo_id = None;
|
algo_id.surcharge_config_algo_id = None;
|
||||||
update_merchant_active_algorithm_ref(db, &key_store, algo_id)
|
let config_key = cache::CacheKind::Surcharge(key.clone().into());
|
||||||
|
update_merchant_active_algorithm_ref(db, &key_store, config_key, algo_id)
|
||||||
.await
|
.await
|
||||||
.change_context(errors::ApiErrorResponse::InternalServerError)
|
.change_context(errors::ApiErrorResponse::InternalServerError)
|
||||||
.attach_printable("Failed to update deleted algorithm ref")?;
|
.attach_printable("Failed to update deleted algorithm ref")?;
|
||||||
|
|||||||
@ -514,10 +514,10 @@ async fn publish_and_redact_merchant_account_cache(
|
|||||||
.map(|publishable_key| CacheKind::Accounts(publishable_key.into()));
|
.map(|publishable_key| CacheKind::Accounts(publishable_key.into()));
|
||||||
|
|
||||||
#[cfg(feature = "business_profile_routing")]
|
#[cfg(feature = "business_profile_routing")]
|
||||||
let kgraph_key = merchant_account.default_profile.as_ref().map(|profile_id| {
|
let cgraph_key = merchant_account.default_profile.as_ref().map(|profile_id| {
|
||||||
CacheKind::CGraph(
|
CacheKind::CGraph(
|
||||||
format!(
|
format!(
|
||||||
"kgraph_{}_{}",
|
"cgraph_{}_{}",
|
||||||
merchant_account.merchant_id.clone(),
|
merchant_account.merchant_id.clone(),
|
||||||
profile_id,
|
profile_id,
|
||||||
)
|
)
|
||||||
@ -526,8 +526,8 @@ async fn publish_and_redact_merchant_account_cache(
|
|||||||
});
|
});
|
||||||
|
|
||||||
#[cfg(not(feature = "business_profile_routing"))]
|
#[cfg(not(feature = "business_profile_routing"))]
|
||||||
let kgraph_key = Some(CacheKind::CGraph(
|
let cgraph_key = Some(CacheKind::CGraph(
|
||||||
format!("kgraph_{}", merchant_account.merchant_id.clone()).into(),
|
format!("cgraph_{}", merchant_account.merchant_id.clone()).into(),
|
||||||
));
|
));
|
||||||
|
|
||||||
let mut cache_keys = vec![CacheKind::Accounts(
|
let mut cache_keys = vec![CacheKind::Accounts(
|
||||||
@ -535,7 +535,7 @@ async fn publish_and_redact_merchant_account_cache(
|
|||||||
)];
|
)];
|
||||||
|
|
||||||
cache_keys.extend(publishable_key.into_iter());
|
cache_keys.extend(publishable_key.into_iter());
|
||||||
cache_keys.extend(kgraph_key.into_iter());
|
cache_keys.extend(cgraph_key.into_iter());
|
||||||
|
|
||||||
cache::publish_into_redact_channel(store.get_cache_store().as_ref(), cache_keys).await?;
|
cache::publish_into_redact_channel(store.get_cache_store().as_ref(), cache_keys).await?;
|
||||||
Ok(())
|
Ok(())
|
||||||
|
|||||||
@ -28,6 +28,12 @@ const ACCOUNTS_CACHE_PREFIX: &str = "accounts";
|
|||||||
/// Prefix for routing cache key
|
/// Prefix for routing cache key
|
||||||
const ROUTING_CACHE_PREFIX: &str = "routing";
|
const ROUTING_CACHE_PREFIX: &str = "routing";
|
||||||
|
|
||||||
|
/// Prefix for three ds decision manager cache key
|
||||||
|
const DECISION_MANAGER_CACHE_PREFIX: &str = "decision_manager";
|
||||||
|
|
||||||
|
/// Prefix for surcharge cache key
|
||||||
|
const SURCHARGE_CACHE_PREFIX: &str = "surcharge";
|
||||||
|
|
||||||
/// Prefix for cgraph cache key
|
/// Prefix for cgraph cache key
|
||||||
const CGRAPH_CACHE_PREFIX: &str = "cgraph";
|
const CGRAPH_CACHE_PREFIX: &str = "cgraph";
|
||||||
|
|
||||||
@ -57,6 +63,14 @@ pub static ACCOUNTS_CACHE: Lazy<Cache> =
|
|||||||
pub static ROUTING_CACHE: Lazy<Cache> =
|
pub static ROUTING_CACHE: Lazy<Cache> =
|
||||||
Lazy::new(|| Cache::new(CACHE_TTL, CACHE_TTI, Some(MAX_CAPACITY)));
|
Lazy::new(|| Cache::new(CACHE_TTL, CACHE_TTI, Some(MAX_CAPACITY)));
|
||||||
|
|
||||||
|
/// 3DS Decision Manager Cache
|
||||||
|
pub static DECISION_MANAGER_CACHE: Lazy<Cache> =
|
||||||
|
Lazy::new(|| Cache::new(CACHE_TTL, CACHE_TTI, Some(MAX_CAPACITY)));
|
||||||
|
|
||||||
|
/// Surcharge Cache
|
||||||
|
pub static SURCHARGE_CACHE: Lazy<Cache> =
|
||||||
|
Lazy::new(|| Cache::new(CACHE_TTL, CACHE_TTI, Some(MAX_CAPACITY)));
|
||||||
|
|
||||||
/// CGraph Cache
|
/// CGraph Cache
|
||||||
pub static CGRAPH_CACHE: Lazy<Cache> =
|
pub static CGRAPH_CACHE: Lazy<Cache> =
|
||||||
Lazy::new(|| Cache::new(CACHE_TTL, CACHE_TTI, Some(MAX_CAPACITY)));
|
Lazy::new(|| Cache::new(CACHE_TTL, CACHE_TTI, Some(MAX_CAPACITY)));
|
||||||
@ -74,6 +88,8 @@ pub enum CacheKind<'a> {
|
|||||||
Config(Cow<'a, str>),
|
Config(Cow<'a, str>),
|
||||||
Accounts(Cow<'a, str>),
|
Accounts(Cow<'a, str>),
|
||||||
Routing(Cow<'a, str>),
|
Routing(Cow<'a, str>),
|
||||||
|
DecisionManager(Cow<'a, str>),
|
||||||
|
Surcharge(Cow<'a, str>),
|
||||||
CGraph(Cow<'a, str>),
|
CGraph(Cow<'a, str>),
|
||||||
PmFiltersCGraph(Cow<'a, str>),
|
PmFiltersCGraph(Cow<'a, str>),
|
||||||
All(Cow<'a, str>),
|
All(Cow<'a, str>),
|
||||||
@ -85,6 +101,8 @@ impl<'a> From<CacheKind<'a>> for RedisValue {
|
|||||||
CacheKind::Config(s) => format!("{CONFIG_CACHE_PREFIX},{s}"),
|
CacheKind::Config(s) => format!("{CONFIG_CACHE_PREFIX},{s}"),
|
||||||
CacheKind::Accounts(s) => format!("{ACCOUNTS_CACHE_PREFIX},{s}"),
|
CacheKind::Accounts(s) => format!("{ACCOUNTS_CACHE_PREFIX},{s}"),
|
||||||
CacheKind::Routing(s) => format!("{ROUTING_CACHE_PREFIX},{s}"),
|
CacheKind::Routing(s) => format!("{ROUTING_CACHE_PREFIX},{s}"),
|
||||||
|
CacheKind::DecisionManager(s) => format!("{DECISION_MANAGER_CACHE_PREFIX},{s}"),
|
||||||
|
CacheKind::Surcharge(s) => format!("{SURCHARGE_CACHE_PREFIX},{s}"),
|
||||||
CacheKind::CGraph(s) => format!("{CGRAPH_CACHE_PREFIX},{s}"),
|
CacheKind::CGraph(s) => format!("{CGRAPH_CACHE_PREFIX},{s}"),
|
||||||
CacheKind::PmFiltersCGraph(s) => format!("{PM_FILTERS_CGRAPH_CACHE_PREFIX},{s}"),
|
CacheKind::PmFiltersCGraph(s) => format!("{PM_FILTERS_CGRAPH_CACHE_PREFIX},{s}"),
|
||||||
CacheKind::All(s) => format!("{ALL_CACHE_PREFIX},{s}"),
|
CacheKind::All(s) => format!("{ALL_CACHE_PREFIX},{s}"),
|
||||||
@ -105,10 +123,15 @@ impl<'a> TryFrom<RedisValue> for CacheKind<'a> {
|
|||||||
ACCOUNTS_CACHE_PREFIX => Ok(Self::Accounts(Cow::Owned(split.1.to_string()))),
|
ACCOUNTS_CACHE_PREFIX => Ok(Self::Accounts(Cow::Owned(split.1.to_string()))),
|
||||||
CONFIG_CACHE_PREFIX => Ok(Self::Config(Cow::Owned(split.1.to_string()))),
|
CONFIG_CACHE_PREFIX => Ok(Self::Config(Cow::Owned(split.1.to_string()))),
|
||||||
ROUTING_CACHE_PREFIX => Ok(Self::Routing(Cow::Owned(split.1.to_string()))),
|
ROUTING_CACHE_PREFIX => Ok(Self::Routing(Cow::Owned(split.1.to_string()))),
|
||||||
|
DECISION_MANAGER_CACHE_PREFIX => {
|
||||||
|
Ok(Self::DecisionManager(Cow::Owned(split.1.to_string())))
|
||||||
|
}
|
||||||
|
SURCHARGE_CACHE_PREFIX => Ok(Self::Surcharge(Cow::Owned(split.1.to_string()))),
|
||||||
CGRAPH_CACHE_PREFIX => Ok(Self::CGraph(Cow::Owned(split.1.to_string()))),
|
CGRAPH_CACHE_PREFIX => Ok(Self::CGraph(Cow::Owned(split.1.to_string()))),
|
||||||
PM_FILTERS_CGRAPH_CACHE_PREFIX => {
|
PM_FILTERS_CGRAPH_CACHE_PREFIX => {
|
||||||
Ok(Self::PmFiltersCGraph(Cow::Owned(split.1.to_string())))
|
Ok(Self::PmFiltersCGraph(Cow::Owned(split.1.to_string())))
|
||||||
}
|
}
|
||||||
|
|
||||||
ALL_CACHE_PREFIX => Ok(Self::All(Cow::Owned(split.1.to_string()))),
|
ALL_CACHE_PREFIX => Ok(Self::All(Cow::Owned(split.1.to_string()))),
|
||||||
_ => Err(validation_err.into()),
|
_ => Err(validation_err.into()),
|
||||||
}
|
}
|
||||||
|
|||||||
@ -3,8 +3,8 @@ use redis_interface::{errors as redis_errors, PubsubInterface, RedisValue};
|
|||||||
use router_env::{logger, tracing::Instrument};
|
use router_env::{logger, tracing::Instrument};
|
||||||
|
|
||||||
use crate::redis::cache::{
|
use crate::redis::cache::{
|
||||||
CacheKey, CacheKind, ACCOUNTS_CACHE, CGRAPH_CACHE, CONFIG_CACHE, PM_FILTERS_CGRAPH_CACHE,
|
CacheKey, CacheKind, ACCOUNTS_CACHE, CGRAPH_CACHE, CONFIG_CACHE, DECISION_MANAGER_CACHE,
|
||||||
ROUTING_CACHE,
|
PM_FILTERS_CGRAPH_CACHE, ROUTING_CACHE, SURCHARGE_CACHE,
|
||||||
};
|
};
|
||||||
|
|
||||||
#[async_trait::async_trait]
|
#[async_trait::async_trait]
|
||||||
@ -119,6 +119,24 @@ impl PubSubInterface for std::sync::Arc<redis_interface::RedisConnectionPool> {
|
|||||||
.await;
|
.await;
|
||||||
key
|
key
|
||||||
}
|
}
|
||||||
|
CacheKind::DecisionManager(key) => {
|
||||||
|
DECISION_MANAGER_CACHE
|
||||||
|
.remove(CacheKey {
|
||||||
|
key: key.to_string(),
|
||||||
|
prefix: self.key_prefix.clone(),
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
key
|
||||||
|
}
|
||||||
|
CacheKind::Surcharge(key) => {
|
||||||
|
SURCHARGE_CACHE
|
||||||
|
.remove(CacheKey {
|
||||||
|
key: key.to_string(),
|
||||||
|
prefix: self.key_prefix.clone(),
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
key
|
||||||
|
}
|
||||||
CacheKind::All(key) => {
|
CacheKind::All(key) => {
|
||||||
CONFIG_CACHE
|
CONFIG_CACHE
|
||||||
.remove(CacheKey {
|
.remove(CacheKey {
|
||||||
@ -150,6 +168,19 @@ impl PubSubInterface for std::sync::Arc<redis_interface::RedisConnectionPool> {
|
|||||||
prefix: self.key_prefix.clone(),
|
prefix: self.key_prefix.clone(),
|
||||||
})
|
})
|
||||||
.await;
|
.await;
|
||||||
|
DECISION_MANAGER_CACHE
|
||||||
|
.remove(CacheKey {
|
||||||
|
key: key.to_string(),
|
||||||
|
prefix: self.key_prefix.clone(),
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
SURCHARGE_CACHE
|
||||||
|
.remove(CacheKey {
|
||||||
|
key: key.to_string(),
|
||||||
|
prefix: self.key_prefix.clone(),
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
|
||||||
key
|
key
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|||||||
Reference in New Issue
Block a user