mirror of
https://github.com/juspay/hyperswitch.git
synced 2025-10-29 00:49:42 +08:00
refactor: add basic counter metrics for IMC (#5006)
This commit is contained in:
@ -9,10 +9,14 @@ use error_stack::{Report, ResultExt};
|
||||
use moka::future::Cache as MokaCache;
|
||||
use once_cell::sync::Lazy;
|
||||
use redis_interface::{errors::RedisError, RedisConnectionPool, RedisValue};
|
||||
use router_env::tracing::{self, instrument};
|
||||
use router_env::{
|
||||
metrics::add_attributes,
|
||||
tracing::{self, instrument},
|
||||
};
|
||||
|
||||
use crate::{
|
||||
errors::StorageError,
|
||||
metrics,
|
||||
redis::{PubSubInterface, RedisConnInterface},
|
||||
};
|
||||
|
||||
@ -53,31 +57,44 @@ const CACHE_TTI: u64 = 10 * 60;
|
||||
const MAX_CAPACITY: u64 = 30;
|
||||
|
||||
/// Config Cache with time_to_live as 30 mins and time_to_idle as 10 mins.
|
||||
pub static CONFIG_CACHE: Lazy<Cache> = Lazy::new(|| Cache::new(CACHE_TTL, CACHE_TTI, None));
|
||||
pub static CONFIG_CACHE: Lazy<Cache> =
|
||||
Lazy::new(|| Cache::new("CONFIG_CACHE", CACHE_TTL, CACHE_TTI, None));
|
||||
|
||||
/// Accounts cache with time_to_live as 30 mins and size limit
|
||||
pub static ACCOUNTS_CACHE: Lazy<Cache> =
|
||||
Lazy::new(|| Cache::new(CACHE_TTL, CACHE_TTI, Some(MAX_CAPACITY)));
|
||||
Lazy::new(|| Cache::new("ACCOUNTS_CACHE", CACHE_TTL, CACHE_TTI, Some(MAX_CAPACITY)));
|
||||
|
||||
/// Routing Cache
|
||||
pub static ROUTING_CACHE: Lazy<Cache> =
|
||||
Lazy::new(|| Cache::new(CACHE_TTL, CACHE_TTI, Some(MAX_CAPACITY)));
|
||||
Lazy::new(|| Cache::new("ROUTING_CACHE", CACHE_TTL, CACHE_TTI, Some(MAX_CAPACITY)));
|
||||
|
||||
/// 3DS Decision Manager Cache
|
||||
pub static DECISION_MANAGER_CACHE: Lazy<Cache> =
|
||||
Lazy::new(|| Cache::new(CACHE_TTL, CACHE_TTI, Some(MAX_CAPACITY)));
|
||||
pub static DECISION_MANAGER_CACHE: Lazy<Cache> = Lazy::new(|| {
|
||||
Cache::new(
|
||||
"DECISION_MANAGER_CACHE",
|
||||
CACHE_TTL,
|
||||
CACHE_TTI,
|
||||
Some(MAX_CAPACITY),
|
||||
)
|
||||
});
|
||||
|
||||
/// Surcharge Cache
|
||||
pub static SURCHARGE_CACHE: Lazy<Cache> =
|
||||
Lazy::new(|| Cache::new(CACHE_TTL, CACHE_TTI, Some(MAX_CAPACITY)));
|
||||
Lazy::new(|| Cache::new("SURCHARGE_CACHE", CACHE_TTL, CACHE_TTI, Some(MAX_CAPACITY)));
|
||||
|
||||
/// CGraph Cache
|
||||
pub static CGRAPH_CACHE: Lazy<Cache> =
|
||||
Lazy::new(|| Cache::new(CACHE_TTL, CACHE_TTI, Some(MAX_CAPACITY)));
|
||||
Lazy::new(|| Cache::new("CGRAPH_CACHE", CACHE_TTL, CACHE_TTI, Some(MAX_CAPACITY)));
|
||||
|
||||
/// PM Filter CGraph Cache
|
||||
pub static PM_FILTERS_CGRAPH_CACHE: Lazy<Cache> =
|
||||
Lazy::new(|| Cache::new(CACHE_TTL, CACHE_TTI, Some(MAX_CAPACITY)));
|
||||
pub static PM_FILTERS_CGRAPH_CACHE: Lazy<Cache> = Lazy::new(|| {
|
||||
Cache::new(
|
||||
"PM_FILTERS_CGRAPH_CACHE",
|
||||
CACHE_TTL,
|
||||
CACHE_TTI,
|
||||
Some(MAX_CAPACITY),
|
||||
)
|
||||
});
|
||||
|
||||
/// Trait which defines the behaviour of types that's gonna be stored in Cache
|
||||
pub trait Cacheable: Any + Send + Sync + DynClone {
|
||||
@ -150,6 +167,7 @@ where
|
||||
dyn_clone::clone_trait_object!(Cacheable);
|
||||
|
||||
pub struct Cache {
|
||||
name: &'static str,
|
||||
inner: MokaCache<String, Arc<dyn Cacheable>>,
|
||||
}
|
||||
|
||||
@ -173,19 +191,38 @@ impl From<CacheKey> for String {
|
||||
impl Cache {
|
||||
/// With given `time_to_live` and `time_to_idle` creates a moka cache.
|
||||
///
|
||||
/// `name` : Cache type name to be used as an attribute in metrics
|
||||
/// `time_to_live`: Time in seconds before an object is stored in a caching system before it’s deleted
|
||||
/// `time_to_idle`: Time in seconds before a `get` or `insert` operation an object is stored in a caching system before it's deleted
|
||||
/// `max_capacity`: Max size in MB's that the cache can hold
|
||||
pub fn new(time_to_live: u64, time_to_idle: u64, max_capacity: Option<u64>) -> Self {
|
||||
pub fn new(
|
||||
name: &'static str,
|
||||
time_to_live: u64,
|
||||
time_to_idle: u64,
|
||||
max_capacity: Option<u64>,
|
||||
) -> Self {
|
||||
// Record the metrics of manual invalidation of cache entry by the application
|
||||
let eviction_listener = move |_, _, cause| {
|
||||
metrics::IN_MEMORY_CACHE_EVICTION_COUNT.add(
|
||||
&metrics::CONTEXT,
|
||||
1,
|
||||
&add_attributes([
|
||||
("cache_type", name.to_owned()),
|
||||
("removal_cause", format!("{:?}", cause)),
|
||||
]),
|
||||
);
|
||||
};
|
||||
let mut cache_builder = MokaCache::builder()
|
||||
.time_to_live(std::time::Duration::from_secs(time_to_live))
|
||||
.time_to_idle(std::time::Duration::from_secs(time_to_idle));
|
||||
.time_to_idle(std::time::Duration::from_secs(time_to_idle))
|
||||
.eviction_listener(eviction_listener);
|
||||
|
||||
if let Some(capacity) = max_capacity {
|
||||
cache_builder = cache_builder.max_capacity(capacity * 1024 * 1024);
|
||||
}
|
||||
|
||||
Self {
|
||||
name,
|
||||
inner: cache_builder.build(),
|
||||
}
|
||||
}
|
||||
@ -195,8 +232,26 @@ impl Cache {
|
||||
}
|
||||
|
||||
pub async fn get_val<T: Clone + Cacheable>(&self, key: CacheKey) -> Option<T> {
|
||||
let val = self.inner.get::<String>(&key.into()).await?;
|
||||
(*val).as_any().downcast_ref::<T>().cloned()
|
||||
let val = self.inner.get::<String>(&key.into()).await;
|
||||
|
||||
// Add cache hit and cache miss metrics
|
||||
if val.is_some() {
|
||||
metrics::IN_MEMORY_CACHE_HIT.add(
|
||||
&metrics::CONTEXT,
|
||||
1,
|
||||
&add_attributes([("cache_type", self.name)]),
|
||||
);
|
||||
} else {
|
||||
metrics::IN_MEMORY_CACHE_MISS.add(
|
||||
&metrics::CONTEXT,
|
||||
1,
|
||||
&add_attributes([("cache_type", self.name)]),
|
||||
);
|
||||
}
|
||||
|
||||
let val = (*val?).as_any().downcast_ref::<T>().cloned();
|
||||
|
||||
val
|
||||
}
|
||||
|
||||
/// Check if a key exists in cache
|
||||
@ -209,14 +264,28 @@ impl Cache {
|
||||
}
|
||||
|
||||
/// Performs any pending maintenance operations needed by the cache.
|
||||
pub async fn run_pending_tasks(&self) {
|
||||
async fn run_pending_tasks(&self) {
|
||||
self.inner.run_pending_tasks().await;
|
||||
}
|
||||
|
||||
/// Returns an approximate number of entries in this cache.
|
||||
pub fn get_entry_count(&self) -> u64 {
|
||||
fn get_entry_count(&self) -> u64 {
|
||||
self.inner.entry_count()
|
||||
}
|
||||
|
||||
pub fn name(&self) -> &'static str {
|
||||
self.name
|
||||
}
|
||||
|
||||
pub async fn record_entry_count_metric(&self) {
|
||||
self.run_pending_tasks().await;
|
||||
|
||||
metrics::IN_MEMORY_CACHE_ENTRY_COUNT.observe(
|
||||
&metrics::CONTEXT,
|
||||
self.get_entry_count(),
|
||||
&add_attributes([("cache_type", self.name)]),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
@ -390,7 +459,7 @@ mod cache_tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn construct_and_get_cache() {
|
||||
let cache = Cache::new(1800, 1800, None);
|
||||
let cache = Cache::new("test", 1800, 1800, None);
|
||||
cache
|
||||
.push(
|
||||
CacheKey {
|
||||
@ -413,7 +482,7 @@ mod cache_tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn eviction_on_size_test() {
|
||||
let cache = Cache::new(2, 2, Some(0));
|
||||
let cache = Cache::new("test", 2, 2, Some(0));
|
||||
cache
|
||||
.push(
|
||||
CacheKey {
|
||||
@ -436,7 +505,7 @@ mod cache_tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn invalidate_cache_for_key() {
|
||||
let cache = Cache::new(1800, 1800, None);
|
||||
let cache = Cache::new("test", 1800, 1800, None);
|
||||
cache
|
||||
.push(
|
||||
CacheKey {
|
||||
@ -467,7 +536,7 @@ mod cache_tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn eviction_on_time_test() {
|
||||
let cache = Cache::new(2, 2, None);
|
||||
let cache = Cache::new("test", 2, 2, None);
|
||||
cache
|
||||
.push(
|
||||
CacheKey {
|
||||
|
||||
Reference in New Issue
Block a user